From 9d4d0a031eefcbb0a1274844317c19ab665036d4 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Fri, 22 Nov 2024 18:03:09 +0000 Subject: [PATCH] chore: Add context to logger once --- pkg/kafka/partition/reader_service.go | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index b643182e617b5..abdf5b7e24e25 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -118,7 +118,7 @@ func newReaderService( cfg: cfg, reader: reader, consumerFactory: consumerFactory, - logger: logger, + logger: log.With(logger, "partition", reader.Partition(), "consumer_group", reader.ConsumerGroup()), metrics: newServiceMetrics(reg), lastProcessedOffset: -1, } @@ -131,11 +131,7 @@ func newReaderService( } func (s *ReaderService) starting(ctx context.Context) error { - level.Info(s.logger).Log( - "msg", "starting reader service", - "partition", s.reader.Partition(), - "consumer_group", s.reader.ConsumerGroup(), - ) + level.Info(s.logger).Log("msg", "starting reader service") s.metrics.reportOwnerOfPartition(s.reader.Partition()) s.metrics.reportStarting() @@ -146,11 +142,7 @@ func (s *ReaderService) starting(ctx context.Context) error { } if lastCommittedOffset == int64(KafkaEndOffset) { - level.Warn(s.logger).Log( - "msg", "no committed offset found for partition, starting from the beginning", - "partition", s.reader.Partition(), - "consumer_group", s.reader.ConsumerGroup(), - ) + level.Warn(s.logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset)) lastCommittedOffset = int64(KafkaStartOffset) } @@ -178,12 +170,7 @@ func (s *ReaderService) starting(ctx context.Context) error { err = s.processNextFetchesUntilTargetOrMaxLagHonored(ctx, maxLag, targetLag, recordsChan) if err != nil { - level.Error(s.logger).Log( - "msg", "failed to catch up to max lag", - "partition", s.reader.Partition(), - "consumer_group", s.reader.ConsumerGroup(), - "err", err, - ) + level.Error(s.logger).Log("msg", "failed to catch up to max lag", "err", err) return err } } @@ -192,11 +179,7 @@ func (s *ReaderService) starting(ctx context.Context) error { } func (s *ReaderService) running(ctx context.Context) error { - level.Info(s.logger).Log( - "msg", "reader service running", - "partition", s.reader.Partition(), - "consumer_group", s.reader.ConsumerGroup(), - ) + level.Info(s.logger).Log("msg", "reader service running") s.metrics.reportRunning() consumer, err := s.consumerFactory(s.committer)