Skip to content

Commit

Permalink
Use no compression for structured metadata and timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi committed Nov 22, 2024
1 parent 42f33d8 commit eae46c0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 45 deletions.
54 changes: 26 additions & 28 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,35 +950,33 @@ func BenchmarkRead(b *testing.B) {
}
}

func BenchmarkReadWithStructuredMetadata(b *testing.B) {
b.Run("v5", func(b *testing.B) {
c := NewMemChunk(ChunkFormatV5, compression.Snappy, UnorderedWithOrganizedStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
b.ResetTimer()
ctx := context.Background()
for n := 0; n < b.N; n++ {
iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true)
for iterator.Next() {
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
func BenchmarkReadWithStructuredMetadataV4(b *testing.B) {
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
b.ResetTimer()
ctx := context.Background()
for n := 0; n < b.N; n++ {
iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true)
for iterator.Next() {
}
})
// b.Run("v5", func(b *testing.B) {
// c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
// fillChunk(c)
// b.ResetTimer()
// ctx := context.Background()
// for n := 0; n < b.N; n++ {
// iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true)
// for iterator.Next() {
// }
// if err := iterator.Close(); err != nil {
// b.Fatal(err)
// }
// }
// })
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
}
func BenchmarkReadWithStructuredMetadataV5(b *testing.B) {
c := NewMemChunk(ChunkFormatV5, compression.Snappy, UnorderedWithOrganizedStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
b.ResetTimer()
ctx := context.Background()
for n := 0; n < b.N; n++ {
iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true)
for iterator.Next() {
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
}

type noopTestPipeline struct{}
Expand Down
73 changes: 56 additions & 17 deletions pkg/chunkenc/organized_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (b *organisedHeadBlock) CompressedBlock(pool compression.WriterPool) (block
}, len(bl), nil
}

func (b *organisedHeadBlock) serialiseStructuredMetadata(pool compression.WriterPool) ([]byte, error) {
func (b *organisedHeadBlock) serialiseStructuredMetadataComp(pool compression.WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
Expand Down Expand Up @@ -148,7 +148,58 @@ func (b *organisedHeadBlock) serialiseStructuredMetadata(pool compression.Writer
return outBuf.Bytes(), nil
}

func (b *organisedHeadBlock) serialiseTimestamps(pool compression.WriterPool) ([]byte, error) {
func (b *organisedHeadBlock) serialiseStructuredMetadata(_ compression.WriterPool) ([]byte, error) {
symbolsSectionBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
symbolsSectionBuf.Reset()
serializeBytesBufferPool.Put(symbolsSectionBuf)
}()

outBuf := &bytes.Buffer{}
encBuf := make([]byte, binary.MaxVarintLen64)

_ = b.forEntries(context.Background(), logproto.FORWARD, 0, math.MaxInt64,
func(_ *stats.Context, _ int64, _ string, symbols symbols) error {
symbolsSectionBuf.Reset()
n := binary.PutUvarint(encBuf, uint64(len(symbols)))
symbolsSectionBuf.Write(encBuf[:n])

for _, l := range symbols {
n = binary.PutUvarint(encBuf, uint64(l.Name))
symbolsSectionBuf.Write(encBuf[:n])

n = binary.PutUvarint(encBuf, uint64(l.Value))
symbolsSectionBuf.Write(encBuf[:n])
}

// write the length of symbols section
n = binary.PutUvarint(encBuf, uint64(symbolsSectionBuf.Len()))
outBuf.Write(encBuf[:n])
outBuf.Write(symbolsSectionBuf.Bytes())

return nil
},
)

return outBuf.Bytes(), nil
}

func (b *organisedHeadBlock) serialiseTimestamps(_ compression.WriterPool) ([]byte, error) {
outBuf := &bytes.Buffer{}
encBuf := make([]byte, binary.MaxVarintLen64)

_ = b.forEntries(context.Background(), logproto.FORWARD, 0, math.MaxInt64,
func(_ *stats.Context, ts int64, _ string, _ symbols) error {
n := binary.PutVarint(encBuf, ts)
outBuf.Write(encBuf[:n])
return nil
},
)

return outBuf.Bytes(), nil
}

func (b *organisedHeadBlock) serialiseTimestampsComp(pool compression.WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
Expand Down Expand Up @@ -247,7 +298,7 @@ type organizedBufferedIterator struct {

func (e *organizedBufferedIterator) Next() bool {
var decompressedBytes, decompressedStructuredMetadataBytes int64
if !e.closed && e.reader == nil {
if !e.closed && e.reader == nil && !e.queryMetricsOnly {
var err error

// todo(shantanu): handle all errors
Expand All @@ -259,23 +310,11 @@ func (e *organizedBufferedIterator) Next() bool {
}

if !e.closed && e.tsReader == nil {
var err error

// todo(shantanu): handle all errors
e.tsReader, err = e.pool.GetReader(bytes.NewReader(e.tsBytes))
if err != nil {
e.err = err
return false
}
e.tsReader = bytes.NewReader(e.tsBytes)
}

if !e.closed && e.smReader == nil {
var err error
e.smReader, err = e.pool.GetReader(bytes.NewReader(e.smBytes))
if err != nil {
e.err = err
return false
}
e.smReader = bytes.NewReader(e.smBytes)
}

// todo (shantanu): need a better way to close the iterator instead of individually doing this.
Expand Down

0 comments on commit eae46c0

Please sign in to comment.