Skip to content

Commit

Permalink
add new histogram field for full native histogram representation
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Sep 9, 2024
1 parent bc69e73 commit f231dd8
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 125 deletions.
32 changes: 31 additions & 1 deletion pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,18 @@ func mergeSampleStreams(output map[string]SampleStream, sampleStreams []SampleSt
stream.Histograms = sliceHistograms(stream.Histograms, existingEndTs)
}
}
// Same for above.
if len(existing.RawHistograms) > 0 && len(stream.RawHistograms) > 0 {
existingEndTs := existing.RawHistograms[len(existing.RawHistograms)-1].GetTimestampMs()
if existingEndTs == stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = stream.RawHistograms[1:]
} else if existingEndTs > stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = sliceRawHistograms(stream.RawHistograms, existingEndTs)
}
}
existing.Samples = append(existing.Samples, stream.Samples...)
existing.Histograms = append(existing.Histograms, stream.Histograms...)
existing.RawHistograms = append(existing.RawHistograms, stream.RawHistograms...)

output[metric] = existing
}
Expand Down Expand Up @@ -396,7 +406,27 @@ func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceHistograms(histograms []SampleHistogramPair, minTs int64) []SampleHistogramPair {
func sliceHistograms(histograms []*SampleHistogramPair, minTs int64) []*SampleHistogramPair {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}

if len(histograms) > 0 && minTs > histograms[len(histograms)-1].GetTimestampMs() {
return histograms[len(histograms):]
}

searchResult := sort.Search(len(histograms), func(i int) bool {
return histograms[i].GetTimestampMs() > minTs
})

return histograms[searchResult:]
}

// sliceRawHistograms assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceRawHistograms(histograms []*cortexpb.Histogram, minTs int64) []*cortexpb.Histogram {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}
Expand Down
60 changes: 30 additions & 30 deletions pkg/querier/tripperware/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
},
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand All @@ -121,7 +121,7 @@ func TestMergeSampleStreams(t *testing.T) {
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand All @@ -132,7 +132,7 @@ func TestMergeSampleStreams(t *testing.T) {
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand Down Expand Up @@ -175,15 +175,15 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand All @@ -193,7 +193,7 @@ func TestMergeSampleStreams(t *testing.T) {
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -212,7 +212,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 2, TimestampMs: 2},
{Value: 3, TimestampMs: 3},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -225,7 +225,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 1, TimestampMs: 1},
{Value: 4, TimestampMs: 4},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand All @@ -241,7 +241,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 3, TimestampMs: 3},
{Value: 4, TimestampMs: 4},
},
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand Down Expand Up @@ -309,30 +309,30 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram2, TimestampMs: 1},
{Histogram: testHistogram2, TimestampMs: 4},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram2, TimestampMs: 2},
{Histogram: testHistogram2, TimestampMs: 3},
},
Expand All @@ -341,7 +341,7 @@ func TestMergeSampleStreams(t *testing.T) {
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -350,7 +350,7 @@ func TestMergeSampleStreams(t *testing.T) {
},
ingester_client.LabelsToKeyString(lbls1): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []SampleHistogramPair{
Histograms: []*SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand Down Expand Up @@ -452,21 +452,21 @@ func TestSliceHistograms(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
histograms []SampleHistogramPair
histograms []*SampleHistogramPair
minTs int64
expectedHistograms []SampleHistogramPair
expectedHistograms []*SampleHistogramPair
}{
{name: "empty histograms"},
{
name: "minTs smaller than first histogram's timestamp",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
},
},
minTs: 0,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -475,7 +475,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "input histograms are not sorted, return all histograms",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand All @@ -486,7 +486,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand All @@ -499,7 +499,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "minTs greater than the last histogram's timestamp",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -510,11 +510,11 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 3,
expectedHistograms: []SampleHistogramPair{},
expectedHistograms: []*SampleHistogramPair{},
},
{
name: "input histograms not sorted, minTs greater than the last histogram's timestamp",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 0,
Histogram: testHistogram1,
Expand All @@ -529,11 +529,11 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []SampleHistogramPair{},
expectedHistograms: []*SampleHistogramPair{},
},
{
name: "input histograms are sorted",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 2,
Histogram: testHistogram1,
Expand All @@ -548,7 +548,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 1,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 2,
Histogram: testHistogram1,
Expand All @@ -565,7 +565,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "input histograms are sorted, get sliced histograms",
histograms: []SampleHistogramPair{
histograms: []*SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -580,7 +580,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []SampleHistogramPair{
expectedHistograms: []*SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
for iter.ReadArray() {
h := SampleHistogramPair{}
UnmarshalSampleHistogramPairJSON(unsafe.Pointer(&h), iter)
ss.Histograms = append(ss.Histograms, h)
ss.Histograms = append(ss.Histograms, &h)
}
default:
iter.ReportError("unmarshal SampleStream", fmt.Sprint("unexpected key:", field))
Expand Down Expand Up @@ -309,7 +309,7 @@ func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) {
if i > 0 {
stream.WriteMore()
}
MarshalSampleHistogramPairJSON(unsafe.Pointer(&h), stream)
MarshalSampleHistogramPairJSON(unsafe.Pointer(h), stream)
}
stream.WriteArrayEnd()
}
Expand Down
Loading

0 comments on commit f231dd8

Please sign in to comment.