diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 3ddbd16e08e42..1eafb415313d3 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -950,35 +950,33 @@ func BenchmarkRead(b *testing.B) { } } -func BenchmarkReadWithStructuredMetadata(b *testing.B) { - b.Run("v5", func(b *testing.B) { - c := NewMemChunk(ChunkFormatV5, compression.Snappy, UnorderedWithOrganizedStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) - fillChunk(c) - b.ResetTimer() - ctx := context.Background() - for n := 0; n < b.N; n++ { - iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true) - for iterator.Next() { - } - if err := iterator.Close(); err != nil { - b.Fatal(err) - } +func BenchmarkReadWithStructuredMetadataV4(b *testing.B) { + c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + fillChunk(c) + b.ResetTimer() + ctx := context.Background() + for n := 0; n < b.N; n++ { + iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true) + for iterator.Next() { } - }) - // b.Run("v5", func(b *testing.B) { - // c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) - // fillChunk(c) - // b.ResetTimer() - // ctx := context.Background() - // for n := 0; n < b.N; n++ { - // iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true) - // for iterator.Next() { - // } - // if err := iterator.Close(); err != nil { - // b.Fatal(err) - // } - // } - // }) + if err := iterator.Close(); err != nil { + b.Fatal(err) + } + } +} +func BenchmarkReadWithStructuredMetadataV5(b *testing.B) { + c := NewMemChunk(ChunkFormatV5, compression.Snappy, UnorderedWithOrganizedStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + fillChunk(c) + b.ResetTimer() + ctx := context.Background() + for n := 0; n < b.N; n++ { + iterator := c.SampleIterator(ctx, time.Unix(0, 0), time.Now(), countExtractor, true) + for iterator.Next() { + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } + } } type noopTestPipeline struct{} diff --git a/pkg/chunkenc/organized_head.go b/pkg/chunkenc/organized_head.go index 06b4adeade51a..f013f522c14b5 100644 --- a/pkg/chunkenc/organized_head.go +++ b/pkg/chunkenc/organized_head.go @@ -96,7 +96,7 @@ func (b *organisedHeadBlock) CompressedBlock(pool compression.WriterPool) (block }, len(bl), nil } -func (b *organisedHeadBlock) serialiseStructuredMetadata(pool compression.WriterPool) ([]byte, error) { +func (b *organisedHeadBlock) serialiseStructuredMetadataComp(pool compression.WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { inBuf.Reset() @@ -148,7 +148,58 @@ func (b *organisedHeadBlock) serialiseStructuredMetadata(pool compression.Writer return outBuf.Bytes(), nil } -func (b *organisedHeadBlock) serialiseTimestamps(pool compression.WriterPool) ([]byte, error) { +func (b *organisedHeadBlock) serialiseStructuredMetadata(_ compression.WriterPool) ([]byte, error) { + symbolsSectionBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) + defer func() { + symbolsSectionBuf.Reset() + serializeBytesBufferPool.Put(symbolsSectionBuf) + }() + + outBuf := &bytes.Buffer{} + encBuf := make([]byte, binary.MaxVarintLen64) + + _ = b.forEntries(context.Background(), logproto.FORWARD, 0, math.MaxInt64, + func(_ *stats.Context, _ int64, _ string, symbols symbols) error { + symbolsSectionBuf.Reset() + n := binary.PutUvarint(encBuf, uint64(len(symbols))) + symbolsSectionBuf.Write(encBuf[:n]) + + for _, l := range symbols { + n = binary.PutUvarint(encBuf, uint64(l.Name)) + symbolsSectionBuf.Write(encBuf[:n]) + + n = binary.PutUvarint(encBuf, uint64(l.Value)) + symbolsSectionBuf.Write(encBuf[:n]) + } + + // write the length of symbols section + n = binary.PutUvarint(encBuf, uint64(symbolsSectionBuf.Len())) + outBuf.Write(encBuf[:n]) + outBuf.Write(symbolsSectionBuf.Bytes()) + + return nil + }, + ) + + return outBuf.Bytes(), nil +} + +func (b *organisedHeadBlock) serialiseTimestamps(_ compression.WriterPool) ([]byte, error) { + outBuf := &bytes.Buffer{} + encBuf := make([]byte, binary.MaxVarintLen64) + + _ = b.forEntries(context.Background(), logproto.FORWARD, 0, math.MaxInt64, + func(_ *stats.Context, ts int64, _ string, _ symbols) error { + n := binary.PutVarint(encBuf, ts) + outBuf.Write(encBuf[:n]) + return nil + }, + ) + + return outBuf.Bytes(), nil +} + +func (b *organisedHeadBlock) serialiseTimestampsComp(pool compression.WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { inBuf.Reset() @@ -247,7 +298,7 @@ type organizedBufferedIterator struct { func (e *organizedBufferedIterator) Next() bool { var decompressedBytes, decompressedStructuredMetadataBytes int64 - if !e.closed && e.reader == nil { + if !e.closed && e.reader == nil && !e.queryMetricsOnly { var err error // todo(shantanu): handle all errors @@ -259,23 +310,11 @@ func (e *organizedBufferedIterator) Next() bool { } if !e.closed && e.tsReader == nil { - var err error - - // todo(shantanu): handle all errors - e.tsReader, err = e.pool.GetReader(bytes.NewReader(e.tsBytes)) - if err != nil { - e.err = err - return false - } + e.tsReader = bytes.NewReader(e.tsBytes) } if !e.closed && e.smReader == nil { - var err error - e.smReader, err = e.pool.GetReader(bytes.NewReader(e.smBytes)) - if err != nil { - e.err = err - return false - } + e.smReader = bytes.NewReader(e.smBytes) } // todo (shantanu): need a better way to close the iterator instead of individually doing this.