Skip to content

Commit

Permalink
Merge branch 'main' into fix/sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
amaury-d authored Sep 19, 2024
2 parents 0f7ce6b + 1625665 commit 600a298
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 48 deletions.
210 changes: 210 additions & 0 deletions docs/blog/2023-11-20-life-of-a-sample-part-1.md

Large diffs are not rendered by default.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/blog/img/life-of-a-sample/remote-write.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
67 changes: 34 additions & 33 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,52 @@ import (
"github.com/pkg/errors"
)

// Bytes is a pool of bytes that can be reused.
type Bytes interface {
// Get returns a new byte slices that fits the given size.
Get(sz int) (*[]byte, error)
// Put returns a byte slice to the right bucket in the pool.
Put(b *[]byte)
// Pool is a pool for slices of type T that can be reused.
type Pool[T any] interface {
// Get returns a new T slice that fits the given size.
Get(sz int) (*[]T, error)
// Put returns a T slice to the right bucket in the pool.
Put(b *[]T)
}

// NoopBytes is pool that always allocated required slice on heap and ignore puts.
type NoopBytes struct{}
// NoopPool is pool that always allocated required slice on heap and ignore puts.
type NoopPool[T any] struct{}

func (p NoopBytes) Get(sz int) (*[]byte, error) {
b := make([]byte, 0, sz)
func (p NoopPool[T]) Get(sz int) (*[]T, error) {
b := make([]T, 0, sz)
return &b, nil
}

func (p NoopBytes) Put(*[]byte) {}
func (p NoopPool[T]) Put(*[]T) {}

// BucketedBytes is a bucketed pool for variably sized byte slices. It can be configured to not allow
// more than a maximum number of bytes being used at a given time.
// Every byte slice obtained from the pool must be returned.
type BucketedBytes struct {
// BucketedPool is a bucketed pool for variably sized T slices. It can be
// configured to not allow more than a maximum number of T items being used at a
// given time. Every slice obtained from the pool must be returned.
type BucketedPool[T any] struct {
buckets []sync.Pool
sizes []int
maxTotal uint64
usedTotal uint64
mtx sync.RWMutex

new func(s int) *[]byte
new func(s int) *[]T
}

// MustNewBucketedBytes is like NewBucketedBytes but panics if construction fails.
// MustNewBucketedPool is like NewBucketedPool but panics if construction fails.
// Useful for package internal pools.
func MustNewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) *BucketedBytes {
pool, err := NewBucketedBytes(minSize, maxSize, factor, maxTotal)
func MustNewBucketedPool[T any](minSize, maxSize int, factor float64, maxTotal uint64) *BucketedPool[T] {
pool, err := NewBucketedPool[T](minSize, maxSize, factor, maxTotal)
if err != nil {
panic(err)
}
return pool
}

// NewBucketedBytes returns a new Bytes with size buckets for minSize to maxSize
// increasing by the given factor and maximum number of used bytes.
// No more than maxTotal bytes can be used at any given time unless maxTotal is set to 0.
func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytes, error) {
// NewBucketedPool returns a new BucketedPool with size buckets for minSize to
// maxSize increasing by the given factor and maximum number of used items. No
// more than maxTotal items can be used at any given time unless maxTotal is set
// to 0.
func NewBucketedPool[T any](minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedPool[T], error) {
if minSize < 1 {
return nil, errors.New("invalid minimum pool size")
}
Expand All @@ -69,23 +70,23 @@ func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*B
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &BucketedBytes{
p := &BucketedPool[T]{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
maxTotal: maxTotal,
new: func(sz int) *[]byte {
s := make([]byte, 0, sz)
new: func(sz int) *[]T {
s := make([]T, 0, sz)
return &s
},
}
return p, nil
}

// ErrPoolExhausted is returned if a pool cannot provide the request bytes.
// ErrPoolExhausted is returned if a pool cannot provide the requested slice.
var ErrPoolExhausted = errors.New("pool exhausted")

// Get returns a new byte slice that fits the given size.
func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
// Get returns a slice into from the bucket that fits the given size.
func (p *BucketedPool[T]) Get(sz int) (*[]T, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

Expand All @@ -97,7 +98,7 @@ func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
if sz > bktSize {
continue
}
b, ok := p.buckets[i].Get().(*[]byte)
b, ok := p.buckets[i].Get().(*[]T)
if !ok {
b = p.new(bktSize)
}
Expand All @@ -111,8 +112,8 @@ func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
return p.new(sz), nil
}

// Put returns a byte slice to the right bucket in the pool.
func (p *BucketedBytes) Put(b *[]byte) {
// Put returns a slice to the right bucket in the pool.
func (p *BucketedPool[T]) Put(b *[]T) {
if b == nil {
return
}
Expand All @@ -138,7 +139,7 @@ func (p *BucketedBytes) Put(b *[]byte) {
}
}

func (p *BucketedBytes) UsedBytes() uint64 {
func (p *BucketedPool[T]) UsedBytes() uint64 {
p.mtx.RLock()
defer p.mtx.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions pkg/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestMain(m *testing.M) {
}

func TestBytesPool(t *testing.T) {
chunkPool, err := NewBucketedBytes(10, 100, 2, 1000)
chunkPool, err := NewBucketedPool[byte](10, 100, 2, 1000)
testutil.Ok(t, err)

testutil.Equals(t, []int{10, 20, 40, 80}, chunkPool.sizes)
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestBytesPool(t *testing.T) {
}

func TestRacePutGet(t *testing.T) {
chunkPool, err := NewBucketedBytes(3, 100, 2, 5000)
chunkPool, err := NewBucketedPool[byte](3, 100, 2, 5000)
testutil.Ok(t, err)

s := sync.WaitGroup{}
Expand Down
15 changes: 8 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -382,7 +383,7 @@ type BucketStore struct {
indexCache storecache.IndexCache
indexReaderPool *indexheader.ReaderPool
buffers sync.Pool
chunkPool pool.Bytes
chunkPool pool.Pool[byte]
seriesBatchSize int

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
Expand Down Expand Up @@ -504,7 +505,7 @@ func WithQueryGate(queryGate gate.Gate) BucketStoreOption {
}

// WithChunkPool sets a pool.Bytes to use for chunks.
func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption {
func WithChunkPool(chunkPool pool.Pool[byte]) BucketStoreOption {
return func(s *BucketStore) {
s.chunkPool = chunkPool
}
Expand Down Expand Up @@ -600,7 +601,7 @@ func NewBucketStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
chunkPool: pool.NoopBytes{},
chunkPool: pool.NoopPool[byte]{},
blocks: map[ulid.ULID]*bucketBlock{},
blockSets: map[uint64]*bucketBlockSet{},
blockSyncConcurrency: blockSyncConcurrency,
Expand Down Expand Up @@ -2321,7 +2322,7 @@ type bucketBlock struct {
meta *metadata.Meta
dir string
indexCache storecache.IndexCache
chunkPool pool.Bytes
chunkPool pool.Pool[byte]
extLset labels.Labels

indexHeaderReader indexheader.Reader
Expand All @@ -2347,7 +2348,7 @@ func newBucketBlock(
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.Bytes,
chunkPool pool.Pool[byte],
indexHeadReader indexheader.Reader,
p Partitioner,
maxSeriesSizeFunc BlockEstimator,
Expand Down Expand Up @@ -3874,6 +3875,6 @@ func (s *queryStats) toHints() *hintspb.QueryStats {
}

// NewDefaultChunkBytesPool returns a chunk bytes pool with default settings.
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) {
return pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, maxChunkPoolBytes)
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Pool[byte], error) {
return pool.NewBucketedPool[byte](chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, maxChunkPoolBytes)
}
9 changes: 5 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -1492,7 +1493,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk,
f, err := block.NewRawMetaFetcher(logger, ibkt, baseBlockIDsFetcher)
testutil.Ok(t, err)

chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB.
chunkPool, err := pool.NewBucketedPool[byte](chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB.
testutil.Ok(t, err)

st, err := NewBucketStore(
Expand Down Expand Up @@ -1599,7 +1600,7 @@ func (m fakePool) Get(sz int) (*[]byte, error) {
func (m fakePool) Put(_ *[]byte) {}

type mockedPool struct {
parent pool.Bytes
parent pool.Pool[byte]
balance atomic.Uint64
gets atomic.Uint64
}
Expand Down Expand Up @@ -1634,7 +1635,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
Source: metadata.TestSource,
}

chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 100e7)
chunkPool, err := pool.NewBucketedPool[byte](chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 100e7)
testutil.Ok(t, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{
Expand Down Expand Up @@ -2714,7 +2715,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) {
testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

// Create a chunk pool with buckets between 8B and 32KB.
chunkPool, err := pool.NewBucketedBytes(8, 32*1024, 2, 1e10)
chunkPool, err := pool.NewBucketedPool[byte](8, 32*1024, 2, 1e10)
testutil.Ok(b, err)

// Create a bucket block with only the dependencies we need for the benchmark.
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"

extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy"
"github.com/thanos-io/thanos/pkg/pool"
)
Expand Down Expand Up @@ -192,7 +193,7 @@ func maximumDecodedLenSnappyStreamed(in []byte) (int, error) {
return maxDecodedLen, nil
}

var decodedBufPool = pool.MustNewBucketedBytes(1024, 65536, 2, 0)
var decodedBufPool = pool.MustNewBucketedPool[byte](1024, 65536, 2, 0)

func newStreamedDiffVarintPostings(input []byte, disablePooling bool) (closeablePostings, error) {
// We can't use the regular s2.Reader because it assumes a stream.
Expand Down Expand Up @@ -449,7 +450,7 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) {
}

// Creating 15 buckets from 1k to 32mb.
var snappyDecodePool = pool.MustNewBucketedBytes(1024, 32*1024*1024, 2, 0)
var snappyDecodePool = pool.MustNewBucketedPool[byte](1024, 32*1024*1024, 2, 0)

type closeablePostings interface {
index.Postings
Expand Down

0 comments on commit 600a298

Please sign in to comment.