Skip to content

Commit

Permalink
Add prealloc timeseries v2
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 7, 2024
1 parent 6778f49 commit 6204558
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 12 deletions.
6 changes: 3 additions & 3 deletions pkg/cortexpb/slicesPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type byteSlicePools struct {
pools []sync.Pool
}

func newSlicePool(pools int) *byteSlicePools {
func NewSlicePool(pools int) *byteSlicePools {
sp := byteSlicePools{}
sp.init(pools)
return &sp
Expand All @@ -32,7 +32,7 @@ func (sp *byteSlicePools) init(pools int) {
}
}

func (sp *byteSlicePools) getSlice(size int) *[]byte {
func (sp *byteSlicePools) GetSlice(size int) *[]byte {
index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower

if index >= len(sp.pools) {
Expand All @@ -50,7 +50,7 @@ func (sp *byteSlicePools) getSlice(size int) *[]byte {
return s
}

func (sp *byteSlicePools) reuseSlice(s *[]byte) {
func (sp *byteSlicePools) ReuseSlice(s *[]byte) {
index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower

if index >= len(sp.pools) || index < 0 {
Expand Down
12 changes: 6 additions & 6 deletions pkg/cortexpb/slicesPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
)

func TestFuzzyByteSlicePools(t *testing.T) {
sut := newSlicePool(20)
sut := NewSlicePool(20)
maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1))

for i := 0; i < 1000; i++ {
size := rand.Int() % maxByteSize
s := sut.getSlice(size)
s := sut.GetSlice(size)
assert.Equal(t, len(*s), size)
sut.reuseSlice(s)
sut.ReuseSlice(s)
}
}

func TestReturnSliceSmallerThanMin(t *testing.T) {
sut := newSlicePool(20)
sut := NewSlicePool(20)
size := 3
buff := make([]byte, 0, size)
sut.reuseSlice(&buff)
buff2 := sut.getSlice(size * 2)
sut.ReuseSlice(&buff)
buff2 := sut.GetSlice(size * 2)
assert.Equal(t, len(*buff2), size*2)
}
6 changes: 3 additions & 3 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
}
},
}
bytePool = newSlicePool(20)
bytePool = NewSlicePool(20)
)

// PreallocConfig configures how structures will be preallocated to optimise
Expand Down Expand Up @@ -86,7 +86,7 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {

func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.getSlice(size)
p.data = bytePool.GetSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
Expand All @@ -97,7 +97,7 @@ func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {

func ReuseWriteRequest(req *PreallocWriteRequest) {
if req.data != nil {
bytePool.reuseSlice(req.data)
bytePool.ReuseSlice(req.data)
req.data = nil
}
req.Source = 0
Expand Down
136 changes: 136 additions & 0 deletions pkg/cortexpbv2/timeseriesv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package cortexpbv2

import (
"sync"

"github.com/cortexproject/cortex/pkg/cortexpb"
)

var (
expectedTimeseries = 100
expectedLabels = 20
expectedSymbols = 20
expectedSamplesPerSeries = 10
expectedExemplarsPerSeries = 1
expectedHistogramsPerSeries = 1

slicePool = sync.Pool{
New: func() interface{} {
return make([]PreallocTimeseriesV2, 0, expectedTimeseries)
},
}

timeSeriesPool = sync.Pool{
New: func() interface{} {
return &TimeSeries{
LabelsRefs: make([]uint32, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Histograms: make([]Histogram, 0, expectedHistogramsPerSeries),
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries),
Metadata: Metadata{},
}
},
}

writeRequestPool = sync.Pool{
New: func() interface{} {
return &PreallocWriteRequestV2{
WriteRequest: WriteRequest{
Symbols: make([]string, 0, expectedSymbols),
},
}
},
}
bytePool = cortexpb.NewSlicePool(20)
)

// PreallocWriteRequestV2 is a WriteRequest which preallocs slices on Unmarshal.
type PreallocWriteRequestV2 struct {
WriteRequest
data *[]byte
}

// Unmarshal implements proto.Message.
func (p *PreallocWriteRequestV2) Unmarshal(dAtA []byte) error {
p.Timeseries = PreallocTimeseriesV2SliceFromPool()
return p.WriteRequest.Unmarshal(dAtA)
}

func (p *PreallocWriteRequestV2) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.GetSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}

// PreallocTimeseriesV2 is a TimeSeries which preallocs slices on Unmarshal.
type PreallocTimeseriesV2 struct {
*TimeSeries
}

// Unmarshal implements proto.Message.
func (p *PreallocTimeseriesV2) Unmarshal(dAtA []byte) error {
p.TimeSeries = TimeseriesV2FromPool()
return p.TimeSeries.Unmarshal(dAtA)
}

func ReuseWriteRequestV2(req *PreallocWriteRequestV2) {
if req.data != nil {
bytePool.ReuseSlice(req.data)
req.data = nil
}
req.Source = 0
req.Symbols = nil
req.Timeseries = nil
writeRequestPool.Put(req)
}

func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 {
return writeRequestPool.Get().(*PreallocWriteRequestV2)
}

// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool.
// ReuseSlice should be called once done.
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 {
return slicePool.Get().([]PreallocTimeseriesV2)
}

// ReuseSlice puts the slice back into a sync.Pool for reuse.
func ReuseSlice(ts []PreallocTimeseriesV2) {
for i := range ts {
ReuseTimeseries(ts[i].TimeSeries)
}

slicePool.Put(ts[:0]) //nolint:staticcheck //see comment on slicePool for more details
}

// TimeseriesV2FromPool retrieves a pointer to a TimeSeries from a sync.Pool.
// ReuseTimeseries should be called once done, unless ReuseSlice was called on the slice that contains this TimeSeries.
func TimeseriesV2FromPool() *TimeSeries {
return timeSeriesPool.Get().(*TimeSeries)
}

// ReuseTimeseries puts the timeseries back into a sync.Pool for reuse.
func ReuseTimeseries(ts *TimeSeries) {
// clear ts lableRef and samples
ts.LabelsRefs = ts.LabelsRefs[:0]
ts.Samples = ts.Samples[:0]

// clear exmplar labelrefs
for i := range ts.Exemplars {
ts.Exemplars[i].LabelsRefs = ts.Exemplars[i].LabelsRefs[:0]
}

for i := range ts.Histograms {
ts.Histograms[i].Reset()
}

ts.Exemplars = ts.Exemplars[:0]
ts.Histograms = ts.Histograms[:0]
ts.Metadata = Metadata{}
timeSeriesPool.Put(ts)
}
111 changes: 111 additions & 0 deletions pkg/cortexpbv2/timeseriesv2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package cortexpbv2

import (
"fmt"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPreallocTimeseriesV2SliceFromPool(t *testing.T) {
t.Run("new instance is provided when not available to reuse", func(t *testing.T) {
first := PreallocTimeseriesV2SliceFromPool()
second := PreallocTimeseriesV2SliceFromPool()

assert.NotSame(t, first, second)
})

t.Run("instance is cleaned before reusing", func(t *testing.T) {
slice := PreallocTimeseriesV2SliceFromPool()
slice = append(slice, PreallocTimeseriesV2{TimeSeries: &TimeSeries{}})
ReuseSlice(slice)

reused := PreallocTimeseriesV2SliceFromPool()
assert.Len(t, reused, 0)
})
}

func TestTimeseriesV2FromPool(t *testing.T) {
t.Run("new instance is provided when not available to reuse", func(t *testing.T) {
first := TimeseriesV2FromPool()
second := TimeseriesV2FromPool()

assert.NotSame(t, first, second)
})

t.Run("instance is cleaned before reusing", func(t *testing.T) {
ts := TimeseriesV2FromPool()
ts.LabelsRefs = []uint32{1, 2}
ts.Samples = []Sample{{Value: 1, Timestamp: 2}}
ts.Exemplars = []Exemplar{{LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 2}}
ts.Histograms = []Histogram{{}}
fmt.Println("ts.Histograms", len(ts.Histograms))
ReuseTimeseries(ts)

reused := TimeseriesV2FromPool()
assert.Len(t, reused.LabelsRefs, 0)
assert.Len(t, reused.Samples, 0)
assert.Len(t, reused.Exemplars, 0)
assert.Len(t, reused.Histograms, 0)
})
}

func BenchmarkMarshallWriteRequest(b *testing.B) {
ts := PreallocTimeseriesV2SliceFromPool()

for i := 0; i < 100; i++ {
ts = append(ts, PreallocTimeseriesV2{TimeSeries: TimeseriesV2FromPool()})
ts[i].LabelsRefs = []uint32{1, 2, 3, 4, 5, 6, 7, 8}
ts[i].Samples = []Sample{{Value: 1, Timestamp: 2}}
}

tests := []struct {
name string
writeRequestFactory func() proto.Marshaler
clean func(in interface{})
}{
{
name: "no-pool",
writeRequestFactory: func() proto.Marshaler {
return &WriteRequest{Timeseries: ts}
},
clean: func(in interface{}) {},
},
{
name: "byte pool",
writeRequestFactory: func() proto.Marshaler {
w := &PreallocWriteRequestV2{}
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequestV2(in.(*PreallocWriteRequestV2))
},
},
{
name: "byte and write pool",
writeRequestFactory: func() proto.Marshaler {
w := PreallocWriteRequestV2FromPool()
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequestV2(in.(*PreallocWriteRequestV2))
},
},
}

for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
w := tc.writeRequestFactory()
_, err := w.Marshal()
require.NoError(b, err)
tc.clean(w)
}
b.ReportAllocs()
})
}
}

0 comments on commit 6204558

Please sign in to comment.