diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index dedcf183485c..b643182e617b 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -136,7 +136,8 @@ func (s *ReaderService) starting(ctx context.Context) error { "partition", s.reader.Partition(), "consumer_group", s.reader.ConsumerGroup(), ) - s.metrics.reportStarting(s.reader.Partition()) + s.metrics.reportOwnerOfPartition(s.reader.Partition()) + s.metrics.reportStarting() // Fetch the last committed offset to determine where to start reading lastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx) @@ -196,7 +197,7 @@ func (s *ReaderService) running(ctx context.Context) error { "partition", s.reader.Partition(), "consumer_group", s.reader.ConsumerGroup(), ) - s.metrics.reportRunning(s.reader.Partition()) + s.metrics.reportRunning() consumer, err := s.consumerFactory(s.committer) if err != nil { @@ -396,14 +397,16 @@ func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record { return records } -func (s *serviceMetrics) reportStarting(partition int32) { - s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1) +func (s *serviceMetrics) reportOwnerOfPartition(id int32) { + s.partition.WithLabelValues(strconv.Itoa(int(id))).Set(1) +} + +func (s *serviceMetrics) reportStarting() { s.phase.WithLabelValues(phaseStarting).Set(1) s.phase.WithLabelValues(phaseRunning).Set(0) } -func (s *serviceMetrics) reportRunning(partition int32) { - s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1) +func (s *serviceMetrics) reportRunning() { s.phase.WithLabelValues(phaseStarting).Set(0) s.phase.WithLabelValues(phaseRunning).Set(1) }