-
Notifications
You must be signed in to change notification settings - Fork 806
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: SungJin1212 <[email protected]>
- Loading branch information
1 parent
722101d
commit 6e0054a
Showing
6 changed files
with
260 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
package cortexpbv2 | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/cortexproject/cortex/pkg/cortexpb" | ||
) | ||
|
||
var ( | ||
expectedTimeseries = 100 | ||
expectedLabels = 20 | ||
expectedSymbols = 20 | ||
expectedSamplesPerSeries = 10 | ||
expectedExemplarsPerSeries = 1 | ||
expectedHistogramsPerSeries = 1 | ||
|
||
slicePool = sync.Pool{ | ||
New: func() interface{} { | ||
return make([]PreallocTimeseriesV2, 0, expectedTimeseries) | ||
}, | ||
} | ||
|
||
timeSeriesPool = sync.Pool{ | ||
New: func() interface{} { | ||
return &TimeSeries{ | ||
LabelsRefs: make([]uint32, 0, expectedLabels), | ||
Samples: make([]Sample, 0, expectedSamplesPerSeries), | ||
Histograms: make([]Histogram, 0, expectedHistogramsPerSeries), | ||
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries), | ||
Metadata: Metadata{}, | ||
} | ||
}, | ||
} | ||
|
||
writeRequestPool = sync.Pool{ | ||
New: func() interface{} { | ||
return &PreallocWriteRequestV2{ | ||
WriteRequest: WriteRequest{ | ||
Symbols: make([]string, 0, expectedSymbols), | ||
}, | ||
} | ||
}, | ||
} | ||
bytePool = cortexpb.NewSlicePool(20) | ||
) | ||
|
||
// PreallocWriteRequestV2 is a WriteRequest which preallocs slices on Unmarshal. | ||
type PreallocWriteRequestV2 struct { | ||
WriteRequest | ||
data *[]byte | ||
} | ||
|
||
// Unmarshal implements proto.Message. | ||
func (p *PreallocWriteRequestV2) Unmarshal(dAtA []byte) error { | ||
p.Timeseries = PreallocTimeseriesV2SliceFromPool() | ||
return p.WriteRequest.Unmarshal(dAtA) | ||
} | ||
|
||
func (p *PreallocWriteRequestV2) Marshal() (dAtA []byte, err error) { | ||
size := p.Size() | ||
p.data = bytePool.GetSlice(size) | ||
dAtA = *p.data | ||
n, err := p.MarshalToSizedBuffer(dAtA[:size]) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return dAtA[:n], nil | ||
} | ||
|
||
// PreallocTimeseriesV2 is a TimeSeries which preallocs slices on Unmarshal. | ||
type PreallocTimeseriesV2 struct { | ||
*TimeSeries | ||
} | ||
|
||
// Unmarshal implements proto.Message. | ||
func (p *PreallocTimeseriesV2) Unmarshal(dAtA []byte) error { | ||
p.TimeSeries = TimeseriesV2FromPool() | ||
return p.TimeSeries.Unmarshal(dAtA) | ||
} | ||
|
||
func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { | ||
if req.data != nil { | ||
bytePool.ReuseSlice(req.data) | ||
req.data = nil | ||
} | ||
req.Source = 0 | ||
req.Symbols = nil | ||
req.Timeseries = nil | ||
writeRequestPool.Put(req) | ||
} | ||
|
||
func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 { | ||
return writeRequestPool.Get().(*PreallocWriteRequestV2) | ||
} | ||
|
||
// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool. | ||
// ReuseSlice should be called once done. | ||
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 { | ||
return slicePool.Get().([]PreallocTimeseriesV2) | ||
} | ||
|
||
// ReuseSlice puts the slice back into a sync.Pool for reuse. | ||
func ReuseSlice(ts []PreallocTimeseriesV2) { | ||
for i := range ts { | ||
ReuseTimeseries(ts[i].TimeSeries) | ||
} | ||
|
||
slicePool.Put(ts[:0]) //nolint:staticcheck //see comment on slicePool for more details | ||
} | ||
|
||
// TimeseriesV2FromPool retrieves a pointer to a TimeSeries from a sync.Pool. | ||
// ReuseTimeseries should be called once done, unless ReuseSlice was called on the slice that contains this TimeSeries. | ||
func TimeseriesV2FromPool() *TimeSeries { | ||
return timeSeriesPool.Get().(*TimeSeries) | ||
} | ||
|
||
// ReuseTimeseries puts the timeseries back into a sync.Pool for reuse. | ||
func ReuseTimeseries(ts *TimeSeries) { | ||
// clear ts lableRef and samples | ||
ts.LabelsRefs = ts.LabelsRefs[:0] | ||
ts.Samples = ts.Samples[:0] | ||
|
||
// clear exmplar labelrefs | ||
for i := range ts.Exemplars { | ||
ts.Exemplars[i].LabelsRefs = ts.Exemplars[i].LabelsRefs[:0] | ||
} | ||
|
||
for i := range ts.Histograms { | ||
ts.Histograms[i].Reset() | ||
} | ||
|
||
ts.Exemplars = ts.Exemplars[:0] | ||
ts.Histograms = ts.Histograms[:0] | ||
ts.Metadata = Metadata{} | ||
timeSeriesPool.Put(ts) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package cortexpbv2 | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/gogo/protobuf/proto" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestPreallocTimeseriesV2SliceFromPool(t *testing.T) { | ||
t.Run("new instance is provided when not available to reuse", func(t *testing.T) { | ||
first := PreallocTimeseriesV2SliceFromPool() | ||
second := PreallocTimeseriesV2SliceFromPool() | ||
|
||
assert.NotSame(t, first, second) | ||
}) | ||
|
||
t.Run("instance is cleaned before reusing", func(t *testing.T) { | ||
slice := PreallocTimeseriesV2SliceFromPool() | ||
slice = append(slice, PreallocTimeseriesV2{TimeSeries: &TimeSeries{}}) | ||
ReuseSlice(slice) | ||
|
||
reused := PreallocTimeseriesV2SliceFromPool() | ||
assert.Len(t, reused, 0) | ||
}) | ||
} | ||
|
||
func TestTimeseriesV2FromPool(t *testing.T) { | ||
t.Run("new instance is provided when not available to reuse", func(t *testing.T) { | ||
first := TimeseriesV2FromPool() | ||
second := TimeseriesV2FromPool() | ||
|
||
assert.NotSame(t, first, second) | ||
}) | ||
|
||
t.Run("instance is cleaned before reusing", func(t *testing.T) { | ||
ts := TimeseriesV2FromPool() | ||
ts.LabelsRefs = []uint32{1, 2} | ||
ts.Samples = []Sample{{Value: 1, Timestamp: 2}} | ||
ts.Exemplars = []Exemplar{{LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 2}} | ||
ts.Histograms = []Histogram{{}} | ||
fmt.Println("ts.Histograms", len(ts.Histograms)) | ||
ReuseTimeseries(ts) | ||
|
||
reused := TimeseriesV2FromPool() | ||
assert.Len(t, reused.LabelsRefs, 0) | ||
assert.Len(t, reused.Samples, 0) | ||
assert.Len(t, reused.Exemplars, 0) | ||
assert.Len(t, reused.Histograms, 0) | ||
}) | ||
} | ||
|
||
func BenchmarkMarshallWriteRequest(b *testing.B) { | ||
ts := PreallocTimeseriesV2SliceFromPool() | ||
|
||
for i := 0; i < 100; i++ { | ||
ts = append(ts, PreallocTimeseriesV2{TimeSeries: TimeseriesV2FromPool()}) | ||
ts[i].LabelsRefs = []uint32{1, 2, 3, 4, 5, 6, 7, 8} | ||
ts[i].Samples = []Sample{{Value: 1, Timestamp: 2}} | ||
} | ||
|
||
tests := []struct { | ||
name string | ||
writeRequestFactory func() proto.Marshaler | ||
clean func(in interface{}) | ||
}{ | ||
{ | ||
name: "no-pool", | ||
writeRequestFactory: func() proto.Marshaler { | ||
return &WriteRequest{Timeseries: ts} | ||
}, | ||
clean: func(in interface{}) {}, | ||
}, | ||
{ | ||
name: "byte pool", | ||
writeRequestFactory: func() proto.Marshaler { | ||
w := &PreallocWriteRequestV2{} | ||
w.Timeseries = ts | ||
return w | ||
}, | ||
clean: func(in interface{}) { | ||
ReuseWriteRequestV2(in.(*PreallocWriteRequestV2)) | ||
}, | ||
}, | ||
{ | ||
name: "byte and write pool", | ||
writeRequestFactory: func() proto.Marshaler { | ||
w := PreallocWriteRequestV2FromPool() | ||
w.Timeseries = ts | ||
return w | ||
}, | ||
clean: func(in interface{}) { | ||
ReuseWriteRequestV2(in.(*PreallocWriteRequestV2)) | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range tests { | ||
b.Run(tc.name, func(b *testing.B) { | ||
for i := 0; i < b.N; i++ { | ||
w := tc.writeRequestFactory() | ||
_, err := w.Marshal() | ||
require.NoError(b, err) | ||
tc.clean(w) | ||
} | ||
b.ReportAllocs() | ||
}) | ||
} | ||
} |