Skip to content

Commit

Permalink
Merge branch 'main' into Jayclifford345-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Nov 22, 2024
2 parents cc6bac6 + d2e1992 commit ac9a691
Show file tree
Hide file tree
Showing 23 changed files with 176 additions and 96 deletions.
2 changes: 0 additions & 2 deletions docs/sources/get-started/labels/structured-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ You should only use structured metadata in the following situations:
- If you are using [Explore Logs](https://grafana.com/docs/grafana-cloud/visualizations/simplified-exploration/logs/) to visualize and explore your Loki logs. You must set `discover_log_levels` and `allow_structured_metadata` to `true` in your Loki configuration.
- If you are a large-scale customer, who is ingesting more than 75TB of logs a month and are using [Bloom filters](https://grafana.com/docs/loki/<LOKI_VERSION>/operations/bloom-filters/) (Experimental), starting in [Loki 3.3](https://grafana.com/docs/loki/<LOKI_VERSION>/release-notes/v3-3/) Bloom filters now utilize structured metadata.

We do not recommend extracting information that already exists in your log lines and putting it into structured metadata.

## Attaching structured metadata to log lines

You have the option to attach structured metadata to log lines in the push payload along with each log line and the timestamp.
Expand Down
1 change: 1 addition & 0 deletions docs/sources/operations/bloom-filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ aliases:
{{< admonition type="warning" >}}
This feature is an [experimental feature](/docs/release-life-cycle/). Engineering and on-call support is not available. No SLA is provided.
Note that this feature is intended for users who are ingesting more than 75TB of logs a month, as it is designed to accelerate queries against large volumes of logs.
Query acceleration via Bloom filters is enabled for select Grafana Cloud customers ingesting more that 75TB of logs a month.
{{< /admonition >}}

Loki leverages [bloom filters](https://en.wikipedia.org/wiki/Bloom_filter) to speed up queries by reducing the amount of data Loki needs to load from the store and iterate through.
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go v1.55.5
github.com/baidubce/bce-sdk-go v0.9.200
github.com/baidubce/bce-sdk-go v0.9.201
github.com/bmatcuk/doublestar/v4 v4.7.1
github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500
github.com/cespare/xxhash/v2 v2.3.0
Expand Down Expand Up @@ -103,7 +103,7 @@ require (
golang.org/x/sync v0.9.0
golang.org/x/sys v0.27.0
golang.org/x/time v0.8.0
google.golang.org/api v0.208.0
google.golang.org/api v0.209.0
google.golang.org/grpc v1.68.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1008,8 +1008,8 @@ github.com/aws/smithy-go v1.11.1 h1:IQ+lPZVkSM3FRtyaDox41R8YS6iwPMYIreejOgPW49g=
github.com/aws/smithy-go v1.11.1/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM=
github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo=
github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM=
github.com/baidubce/bce-sdk-go v0.9.200 h1:zF3yuKp/wkKZhutCZYl5HtIZJPziWsPEu1kxHEyOaWI=
github.com/baidubce/bce-sdk-go v0.9.200/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/baidubce/bce-sdk-go v0.9.201 h1:gIuvsE6azuwICmPc/P3IfyUk/vU3IqkwkT3J94vpOeo=
github.com/baidubce/bce-sdk-go v0.9.201/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
Expand Down Expand Up @@ -3487,8 +3487,8 @@ google.golang.org/api v0.122.0/go.mod h1:gcitW0lvnyWjSp9nKxAbdHKIZ6vF4aajGueeslZ
google.golang.org/api v0.124.0/go.mod h1:xu2HQurE5gi/3t1aFCvhPD781p0a3p11sdunTJ2BlP4=
google.golang.org/api v0.125.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw=
google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw=
google.golang.org/api v0.208.0 h1:8Y62MUGRviQnnP9/41/bYAGySPKAN9iwzV96ZvhwyVE=
google.golang.org/api v0.208.0/go.mod h1:I53S168Yr/PNDNMi5yPnDc0/LGRZO6o7PoEbl/HY3CM=
google.golang.org/api v0.209.0 h1:Ja2OXNlyRlWCWu8o+GgI4yUn/wz9h/5ZfFbKz+dQX+w=
google.golang.org/api v0.209.0/go.mod h1:I53S168Yr/PNDNMi5yPnDc0/LGRZO6o7PoEbl/HY3CM=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
4 changes: 2 additions & 2 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ type PartitionController interface {
// containing log data and "committed" is the consumer group
type PartitionJobController struct {
stepLen int64
part partition.ReaderIfc
part partition.Reader
backoff backoff.Config
decoder *kafka.Decoder
}

func NewPartitionJobController(
controller partition.ReaderIfc,
controller partition.Reader,
backoff backoff.Config,
) (*PartitionJobController, error) {
decoder, err := kafka.NewDecoder()
Expand Down
3 changes: 1 addition & 2 deletions pkg/kafka/client/reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -36,7 +35,7 @@ func NewReaderClient(kafkaCfg kafka.Config, metrics *kprom.Metrics, logger log.L
)
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, errors.Wrap(err, "creating kafka client")
return nil, fmt.Errorf("creating kafka client: %w", err)
}
if kafkaCfg.AutoCreateTopicEnabled {
setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kafka/partition/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type partitionCommitter struct {
lastCommittedOffset prometheus.Gauge

logger log.Logger
reader ReaderIfc
reader Reader
commitFreq time.Duration

toCommit *atomic.Int64
wg sync.WaitGroup
cancel context.CancelFunc
}

func newCommitter(reader ReaderIfc, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
func newCommitter(reader Reader, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
reader: reader,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestPartitionCommitter(t *testing.T) {
reg := prometheus.NewRegistry()
partitionID := int32(1)
consumerGroup := "test-consumer-group"
reader := newReader(
reader := newStdReader(
client,
kafkaCfg.Topic,
partitionID,
Expand Down
36 changes: 18 additions & 18 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Record struct {
Offset int64
}

type ReaderIfc interface {
type Reader interface {
Topic() string
Partition() int32
ConsumerGroup() string
Expand Down Expand Up @@ -91,8 +91,8 @@ func newReaderMetrics(r prometheus.Registerer) *readerMetrics {
}
}

// Reader provides low-level access to Kafka partition reading operations
type Reader struct {
// StdReader provides low-level access to Kafka partition reading operations
type StdReader struct {
client *kgo.Client
topic string
partitionID int32
Expand All @@ -101,13 +101,13 @@ type Reader struct {
logger log.Logger
}

func NewReader(
func NewStdReader(
cfg kafka.Config,
partitionID int32,
instanceID string,
logger log.Logger,
reg prometheus.Registerer,
) (*Reader, error) {
) (*StdReader, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
Expand All @@ -120,7 +120,7 @@ func NewReader(
}

// Create the reader
return newReader(
return newStdReader(
c,
cfg.Topic,
partitionID,
Expand All @@ -130,16 +130,16 @@ func NewReader(
), nil
}

// NewReader creates a new Reader instance
func newReader(
// newStdReader creates a new StdReader instance
func newStdReader(
client *kgo.Client,
topic string,
partitionID int32,
consumerGroup string,
logger log.Logger,
reg prometheus.Registerer,
) *Reader {
return &Reader{
) *StdReader {
return &StdReader{
client: client,
topic: topic,
partitionID: partitionID,
Expand All @@ -150,22 +150,22 @@ func newReader(
}

// Topic returns the topic being read
func (r *Reader) Topic() string {
func (r *StdReader) Topic() string {
return r.topic
}

// Partition returns the partition being read
func (r *Reader) Partition() int32 {
func (r *StdReader) Partition() int32 {
return r.partitionID
}

// ConsumerGroup returns the consumer group
func (r *Reader) ConsumerGroup() string {
func (r *StdReader) ConsumerGroup() string {
return r.consumerGroup
}

// FetchLastCommittedOffset retrieves the last committed offset for this partition
func (r *Reader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
func (r *StdReader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
req := kmsg.NewPtrOffsetFetchRequest()
req.Topics = []kmsg.OffsetFetchRequestTopic{{
Topic: r.topic,
Expand Down Expand Up @@ -210,7 +210,7 @@ func (r *Reader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
}

// FetchPartitionOffset retrieves the offset for a specific position
func (r *Reader) FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) {
func (r *StdReader) FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) {
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
partitionReq.Partition = r.partitionID
partitionReq.Timestamp = int64(position)
Expand Down Expand Up @@ -257,7 +257,7 @@ func (r *Reader) FetchPartitionOffset(ctx context.Context, position SpecialOffse
}

// Poll retrieves the next batch of records from Kafka
func (r *Reader) Poll(ctx context.Context) ([]Record, error) {
func (r *StdReader) Poll(ctx context.Context) ([]Record, error) {
start := time.Now()
fetches := r.client.PollFetches(ctx)
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -303,14 +303,14 @@ func (r *Reader) Poll(ctx context.Context) ([]Record, error) {
return records, nil
}

func (r *Reader) SetOffsetForConsumption(offset int64) {
func (r *StdReader) SetOffsetForConsumption(offset int64) {
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.topic: {r.partitionID: kgo.NewOffset().At(offset)},
})
}

// Commit commits an offset to the consumer group
func (r *Reader) Commit(ctx context.Context, offset int64) error {
func (r *StdReader) Commit(ctx context.Context, offset int64) error {
admin := kadm.NewClient(r.client)

// Commit the last consumed offset.
Expand Down
27 changes: 15 additions & 12 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ReaderService struct {
services.Service

cfg ReaderConfig
reader ReaderIfc
reader Reader
consumerFactory ConsumerFactory
logger log.Logger
metrics *serviceMetrics
Expand All @@ -82,7 +82,7 @@ func NewReaderService(
) (*ReaderService, error) {

// Create the reader
reader, err := NewReader(
reader, err := NewStdReader(
kafkaCfg,
partitionID,
instanceID,
Expand All @@ -91,10 +91,10 @@ func NewReaderService(
)

if err != nil {
return nil, errors.Wrap(err, "creating kafka reader")
return nil, fmt.Errorf("creating kafka reader: %w", err)
}

return newReaderServiceFromIfc(
return newReaderService(
ReaderConfig{
TargetConsumerLagAtStartup: kafkaCfg.TargetConsumerLagAtStartup,
MaxConsumerLagAtStartup: kafkaCfg.MaxConsumerLagAtStartup,
Expand All @@ -107,9 +107,9 @@ func NewReaderService(
), nil
}

func newReaderServiceFromIfc(
func newReaderService(
cfg ReaderConfig,
reader ReaderIfc,
reader Reader,
consumerFactory ConsumerFactory,
logger log.Logger,
reg prometheus.Registerer,
Expand All @@ -136,7 +136,8 @@ func (s *ReaderService) starting(ctx context.Context) error {
"partition", s.reader.Partition(),
"consumer_group", s.reader.ConsumerGroup(),
)
s.metrics.reportStarting(s.reader.Partition())
s.metrics.reportOwnerOfPartition(s.reader.Partition())
s.metrics.reportStarting()

// Fetch the last committed offset to determine where to start reading
lastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx)
Expand Down Expand Up @@ -196,7 +197,7 @@ func (s *ReaderService) running(ctx context.Context) error {
"partition", s.reader.Partition(),
"consumer_group", s.reader.ConsumerGroup(),
)
s.metrics.reportRunning(s.reader.Partition())
s.metrics.reportRunning()

consumer, err := s.consumerFactory(s.committer)
if err != nil {
Expand Down Expand Up @@ -396,14 +397,16 @@ func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record {
return records
}

func (s *serviceMetrics) reportStarting(partition int32) {
s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1)
func (s *serviceMetrics) reportOwnerOfPartition(id int32) {
s.partition.WithLabelValues(strconv.Itoa(int(id))).Set(1)
}

func (s *serviceMetrics) reportStarting() {
s.phase.WithLabelValues(phaseStarting).Set(1)
s.phase.WithLabelValues(phaseRunning).Set(0)
}

func (s *serviceMetrics) reportRunning(partition int32) {
s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1)
func (s *serviceMetrics) reportRunning() {
s.phase.WithLabelValues(phaseStarting).Set(0)
s.phase.WithLabelValues(phaseRunning).Set(1)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func readersFromKafkaCfg(
kafkaCfg kafka.Config,
consumerFactory ConsumerFactory,
partition int32,
) (ReaderIfc, *ReaderService) {
) (Reader, *ReaderService) {
partitionReader, err := NewReaderService(
kafkaCfg,
partition,
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, fmt.Errorf("calculating block builder partition ID: %w", err)
}

reader, err := partition.NewReader(
reader, err := partition.NewStdReader(
t.Cfg.KafkaConfig,
ingestPartitionID,
id,
Expand Down
22 changes: 20 additions & 2 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"unsafe"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -211,12 +212,29 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
var linesSkipped *prometheus.CounterVec
if d.metrics != nil {
linesSkipped = d.metrics.LinesSkipped
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state, linesSkipped)
if d.tokens == nil && d.state == nil {
return nil
}

return d.train(d.tokens, d.state, ts)
}

func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
if len(tokens) < 4 {
if d.metrics != nil && d.metrics.LinesSkipped != nil {
d.metrics.LinesSkipped.WithLabelValues(TooFewTokens).Inc()
}
return nil
}
if len(tokens) > 80 {
if d.metrics != nil && d.metrics.LinesSkipped != nil {
d.metrics.LinesSkipped.WithLabelValues(TooManyTokens).Inc()
}
return nil
}
if d.metrics != nil {
Expand Down Expand Up @@ -255,7 +273,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state)
tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state, d.metrics.LinesSkipped)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
Expand Down
Loading

0 comments on commit ac9a691

Please sign in to comment.