Skip to content

Commit

Permalink
chore(metric_extraction): Optimize labels result (#15068)
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi authored Nov 22, 2024
1 parent cbdd36a commit 2ae1ead
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
30 changes: 25 additions & 5 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,16 +583,36 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult {
return b.currentResult
}

stream := b.labels(StreamLabel).Copy()
structuredMetadata := b.labels(StructuredMetadataLabel).Copy()
parsed := b.labels(ParsedLabel).Copy()
b.buf = flattenLabels(b.buf, stream, structuredMetadata, parsed)
// Get all labels at once and sort them
b.buf = b.UnsortedLabels(b.buf)
sort.Sort(b.buf)
hash := b.hasher.Hash(b.buf)

if cached, ok := b.resultCache[hash]; ok {
return cached
}

result := NewLabelsResult(b.buf.String(), hash, stream, structuredMetadata, parsed)
// Now segregate the sorted labels into their categories
var stream, meta, parsed []labels.Label

for _, l := range b.buf {
// Skip error labels for stream and meta categories
if l.Name == logqlmodel.ErrorLabel || l.Name == logqlmodel.ErrorDetailsLabel {
parsed = append(parsed, l)
continue
}

// Check which category this label belongs to
if labelsContain(b.add[ParsedLabel], l.Name) {
parsed = append(parsed, l)
} else if labelsContain(b.add[StructuredMetadataLabel], l.Name) {
meta = append(meta, l)
} else {
stream = append(stream, l)
}
}

result := NewLabelsResult(b.buf.String(), hash, labels.New(stream...), labels.New(meta...), labels.New(parsed...))
b.resultCache[hash] = result

return result
Expand Down
39 changes: 39 additions & 0 deletions pkg/logql/log/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package log
import (
"sort"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -463,3 +464,41 @@ func assertLabelResult(t *testing.T, lbs labels.Labels, res LabelsResult) {
res.String(),
)
}

// benchmark streamLineSampleExtractor.Process method
func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) {
// Setup some test data
baseLabels := labels.FromStrings(
"namespace", "prod",
"cluster", "us-east-1",
"pod", "my-pod-123",
"container", "main",
"stream", "stdout",
)

structuredMeta := []labels.Label{
{Name: "level", Value: "info"},
{Name: "caller", Value: "http.go:42"},
{Name: "user", Value: "john"},
{Name: "trace_id", Value: "abc123"},
}

testLine := []byte(`{"timestamp":"2024-01-01T00:00:00Z","level":"info","message":"test message","duration_ms":150}`)

// JSON parsing + filtering + label extraction
matcher := labels.MustNewMatcher(labels.MatchEqual, "level", "info")
filter := NewStringLabelFilter(matcher)
stages := []Stage{
NewJSONParser(),
filter,
}
ex, err := NewLineSampleExtractor(CountExtractor, stages, []string{}, false, false)
require.NoError(b, err)
streamEx := ex.ForStream(baseLabels)
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, _, _ = streamEx.Process(time.Now().UnixNano(), testLine, structuredMeta...)
}
}

0 comments on commit 2ae1ead

Please sign in to comment.