Skip to content

Commit

Permalink
chore: Add context to logger once
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Nov 22, 2024
1 parent d2e1992 commit 9d4d0a0
Showing 1 changed file with 5 additions and 22 deletions.
27 changes: 5 additions & 22 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func newReaderService(
cfg: cfg,
reader: reader,
consumerFactory: consumerFactory,
logger: logger,
logger: log.With(logger, "partition", reader.Partition(), "consumer_group", reader.ConsumerGroup()),
metrics: newServiceMetrics(reg),
lastProcessedOffset: -1,
}
Expand All @@ -131,11 +131,7 @@ func newReaderService(
}

func (s *ReaderService) starting(ctx context.Context) error {
level.Info(s.logger).Log(
"msg", "starting reader service",
"partition", s.reader.Partition(),
"consumer_group", s.reader.ConsumerGroup(),
)
level.Info(s.logger).Log("msg", "starting reader service")
s.metrics.reportOwnerOfPartition(s.reader.Partition())
s.metrics.reportStarting()

Expand All @@ -146,11 +142,7 @@ func (s *ReaderService) starting(ctx context.Context) error {
}

if lastCommittedOffset == int64(KafkaEndOffset) {
level.Warn(s.logger).Log(
"msg", "no committed offset found for partition, starting from the beginning",
"partition", s.reader.Partition(),
"consumer_group", s.reader.ConsumerGroup(),
)
level.Warn(s.logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
lastCommittedOffset = int64(KafkaStartOffset)
}

Expand Down Expand Up @@ -178,12 +170,7 @@ func (s *ReaderService) starting(ctx context.Context) error {

err = s.processNextFetchesUntilTargetOrMaxLagHonored(ctx, maxLag, targetLag, recordsChan)
if err != nil {
level.Error(s.logger).Log(
"msg", "failed to catch up to max lag",
"partition", s.reader.Partition(),
"consumer_group", s.reader.ConsumerGroup(),
"err", err,
)
level.Error(s.logger).Log("msg", "failed to catch up to max lag", "err", err)
return err
}
}
Expand All @@ -192,11 +179,7 @@ func (s *ReaderService) starting(ctx context.Context) error {
}

func (s *ReaderService) running(ctx context.Context) error {
level.Info(s.logger).Log(
"msg", "reader service running",
"partition", s.reader.Partition(),
"consumer_group", s.reader.ConsumerGroup(),
)
level.Info(s.logger).Log("msg", "reader service running")
s.metrics.reportRunning()

consumer, err := s.consumerFactory(s.committer)
Expand Down

0 comments on commit 9d4d0a0

Please sign in to comment.