From 83eff304197ab40bc35c82b7844443f80d28a300 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 20 Nov 2024 16:29:12 +0000 Subject: [PATCH] Use the same variable name in method of Reader structs --- pkg/kafka/partition/reader.go | 154 +++++++++++++++++----------------- 1 file changed, 77 insertions(+), 77 deletions(-) diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 31d1e65e79eb0..784e01de9ac7e 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -93,33 +93,33 @@ func NewReader( // start initializes the Kafka client and committer for the PartitionReader. // This method is called when the PartitionReader service starts. -func (p *Reader) start(ctx context.Context) error { +func (r *Reader) start(ctx context.Context) error { var err error - p.client, err = client.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger) + r.client, err = client.NewReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger) if err != nil { return errors.Wrap(err, "creating kafka reader client") } - p.metrics.reportStarting(p.partitionID) + r.metrics.reportStarting(r.partitionID) // We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from. - lastCommittedOffset := p.fetchLastCommittedOffset(ctx) + lastCommittedOffset := r.fetchLastCommittedOffset(ctx) if lastCommittedOffset == kafkaEndOffset { - level.Warn(p.logger).Log("msg", "no committed offset found for partition, starting from the beginning", "partition", p.partitionID, "consumer_group", p.consumerGroup) + level.Warn(r.logger).Log("msg", "no committed offset found for partition, starting from the beginning", "partition", r.partitionID, "consumer_group", r.consumerGroup) lastCommittedOffset = kafkaStartOffset // If we haven't committed any offsets yet, we start reading from the beginning. } if lastCommittedOffset > 0 { lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset. } - p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ - p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)}, + r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastCommittedOffset)}, }) - level.Info(p.logger).Log("msg", "initialising partition reader", "last_committed_offset", lastCommittedOffset, "partition", p.partitionID, "consumer_group", p.consumerGroup) + level.Info(r.logger).Log("msg", "initialising partition reader", "last_committed_offset", lastCommittedOffset, "partition", r.partitionID, "consumer_group", r.consumerGroup) - p.committer = newCommitter(p.kafkaCfg, kadm.NewClient(p.client), p.partitionID, p.consumerGroup, p.logger, p.reg) + r.committer = newCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg) - if targetLag, maxLag := p.kafkaCfg.TargetConsumerLagAtStartup, p.kafkaCfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { - consumer, err := p.consumerFactory(p.committer) + if targetLag, maxLag := r.kafkaCfg.TargetConsumerLagAtStartup, r.kafkaCfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { + consumer, err := r.consumerFactory(r.committer) if err != nil { return fmt.Errorf("creating consumer: %w", err) } @@ -128,16 +128,16 @@ func (p *Reader) start(ctx context.Context) error { // Temporarily start a consumer to do the initial update recordsChan := make(chan []Record) wait := consumer.Start(cancelCtx, recordsChan) - // Shutdown the consumer after catching up. We start a new instance in the run method to tie the lifecycle to the run context. + // Shutdown the consumer after catching ur. We start a new instance in the run method to tie the lifecycle to the run context. defer func() { close(recordsChan) cancel() wait() }() - err = p.processNextFetchesUntilTargetOrMaxLagHonored(ctx, p.kafkaCfg.MaxConsumerLagAtStartup, p.kafkaCfg.TargetConsumerLagAtStartup, recordsChan) + err = r.processNextFetchesUntilTargetOrMaxLagHonored(ctx, r.kafkaCfg.MaxConsumerLagAtStartup, r.kafkaCfg.TargetConsumerLagAtStartup, recordsChan) if err != nil { - level.Error(p.logger).Log("msg", "failed to catch up to max lag", "partition", p.partitionID, "consumer_group", p.consumerGroup, "err", err) + level.Error(r.logger).Log("msg", "failed to catch up to max lag", "partition", r.partitionID, "consumer_group", r.consumerGroup, "err", err) return err } } @@ -147,91 +147,91 @@ func (p *Reader) start(ctx context.Context) error { // run is the main loop of the PartitionReader. It continuously fetches and processes // data from Kafka, and send it to the consumer. -func (p *Reader) run(ctx context.Context) error { - level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup) - p.metrics.reportRunning(p.partitionID) +func (r *Reader) run(ctx context.Context) error { + level.Info(r.logger).Log("msg", "starting partition reader", "partition", r.partitionID, "consumer_group", r.consumerGroup) + r.metrics.reportRunning(r.partitionID) ctx, cancel := context.WithCancel(ctx) defer cancel() - consumer, err := p.consumerFactory(p.committer) + consumer, err := r.consumerFactory(r.committer) if err != nil { return errors.Wrap(err, "creating consumer") } - recordsChan := p.startFetchLoop(ctx) + recordsChan := r.startFetchLoop(ctx) wait := consumer.Start(ctx, recordsChan) wait() - p.committer.Stop() + r.committer.Stop() return nil } -func (p *Reader) fetchLastCommittedOffset(ctx context.Context) int64 { +func (r *Reader) fetchLastCommittedOffset(ctx context.Context) int64 { // We manually create a request so that we can request the offset for a single partition // only, which is more performant than requesting the offsets for all partitions. req := kmsg.NewPtrOffsetFetchRequest() - req.Topics = []kmsg.OffsetFetchRequestTopic{{Topic: p.kafkaCfg.Topic, Partitions: []int32{p.partitionID}}} - req.Group = p.consumerGroup + req.Topics = []kmsg.OffsetFetchRequestTopic{{Topic: r.kafkaCfg.Topic, Partitions: []int32{r.partitionID}}} + req.Group = r.consumerGroup - resps := p.client.RequestSharded(ctx, req) + resps := r.client.RequestSharded(ctx, req) // Since we issued a request for only 1 partition, we expect exactly 1 response. if expected, actual := 1, len(resps); actual != expected { - level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of responses (expected: %d, got: %d)", expected, actual), "expected", expected, "actual", len(resps)) + level.Error(r.logger).Log("msg", fmt.Sprintf("unexpected number of responses (expected: %d, got: %d)", expected, actual), "expected", expected, "actual", len(resps)) return kafkaStartOffset } // Ensure no error occurred. res := resps[0] if res.Err != nil { - level.Error(p.logger).Log("msg", "error fetching group offset for partition", "err", res.Err) + level.Error(r.logger).Log("msg", "error fetching group offset for partition", "err", res.Err) return kafkaStartOffset } // Parse the response. fetchRes, ok := res.Resp.(*kmsg.OffsetFetchResponse) if !ok { - level.Error(p.logger).Log("msg", "unexpected response type") + level.Error(r.logger).Log("msg", "unexpected response type") return kafkaStartOffset } if expected, actual := 1, len(fetchRes.Groups); actual != expected { - level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of groups in the response (expected: %d, got: %d)", expected, actual)) + level.Error(r.logger).Log("msg", fmt.Sprintf("unexpected number of groups in the response (expected: %d, got: %d)", expected, actual)) return kafkaStartOffset } if expected, actual := 1, len(fetchRes.Groups[0].Topics); actual != expected { - level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of topics in the response (expected: %d, got: %d)", expected, actual)) + level.Error(r.logger).Log("msg", fmt.Sprintf("unexpected number of topics in the response (expected: %d, got: %d)", expected, actual)) return kafkaStartOffset } - if expected, actual := p.kafkaCfg.Topic, fetchRes.Groups[0].Topics[0].Topic; expected != actual { - level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected topic in the response (expected: %s, got: %s)", expected, actual)) + if expected, actual := r.kafkaCfg.Topic, fetchRes.Groups[0].Topics[0].Topic; expected != actual { + level.Error(r.logger).Log("msg", fmt.Sprintf("unexpected topic in the response (expected: %s, got: %s)", expected, actual)) return kafkaStartOffset } if expected, actual := 1, len(fetchRes.Groups[0].Topics[0].Partitions); actual != expected { - level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of partitions in the response (expected: %d, got: %d)", expected, actual)) + level.Error(r.logger).Log("msg", fmt.Sprintf("unexpected number of partitions in the response (expected: %d, got: %d)", expected, actual)) return kafkaStartOffset } - if expected, actual := p.partitionID, fetchRes.Groups[0].Topics[0].Partitions[0].Partition; actual != expected { - level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected partition in the response (expected: %d, got: %d)", expected, actual)) + if expected, actual := r.partitionID, fetchRes.Groups[0].Topics[0].Partitions[0].Partition; actual != expected { + level.Error(r.logger).Log("msg", fmt.Sprintf("unexpected partition in the response (expected: %d, got: %d)", expected, actual)) return kafkaStartOffset } if err := kerr.ErrorForCode(fetchRes.Groups[0].Topics[0].Partitions[0].ErrorCode); err != nil { - level.Error(p.logger).Log("msg", "unexpected error in the response", "err", err) + level.Error(r.logger).Log("msg", "unexpected error in the response", "err", err) return kafkaStartOffset } return fetchRes.Groups[0].Topics[0].Partitions[0].Offset } -func (p *Reader) fetchPartitionOffset(ctx context.Context, position int64) (int64, error) { +func (r *Reader) fetchPartitionOffset(ctx context.Context, position int64) (int64, error) { // Create a custom request to fetch the latest offset of a specific partition. // We manually create a request so that we can request the offset for a single partition // only, which is more performant than requesting the offsets for all partitions. partitionReq := kmsg.NewListOffsetsRequestTopicPartition() - partitionReq.Partition = p.partitionID + partitionReq.Partition = r.partitionID partitionReq.Timestamp = position topicReq := kmsg.NewListOffsetsRequestTopic() - topicReq.Topic = p.kafkaCfg.Topic + topicReq.Topic = r.kafkaCfg.Topic topicReq.Partitions = []kmsg.ListOffsetsRequestTopicPartition{partitionReq} req := kmsg.NewPtrListOffsetsRequest() @@ -240,7 +240,7 @@ func (p *Reader) fetchPartitionOffset(ctx context.Context, position int64) (int6 // Even if we share the same client, other in-flight requests are not canceled once this context is canceled // (or its deadline is exceeded). We've verified it with a unit test. - resps := p.client.RequestSharded(ctx, req) + resps := r.client.RequestSharded(ctx, req) // Since we issued a request for only 1 partition, we expect exactly 1 response. if expected := 1; len(resps) != 1 { @@ -261,13 +261,13 @@ func (p *Reader) fetchPartitionOffset(ctx context.Context, position int64) (int6 if expected, actual := 1, len(listRes.Topics); actual != expected { return 0, fmt.Errorf("unexpected number of topics in the response (expected: %d, got: %d)", expected, actual) } - if expected, actual := p.kafkaCfg.Topic, listRes.Topics[0].Topic; expected != actual { + if expected, actual := r.kafkaCfg.Topic, listRes.Topics[0].Topic; expected != actual { return 0, fmt.Errorf("unexpected topic in the response (expected: %s, got: %s)", expected, actual) } if expected, actual := 1, len(listRes.Topics[0].Partitions); actual != expected { return 0, fmt.Errorf("unexpected number of partitions in the response (expected: %d, got: %d)", expected, actual) } - if expected, actual := p.partitionID, listRes.Topics[0].Partitions[0].Partition; actual != expected { + if expected, actual := r.partitionID, listRes.Topics[0].Partitions[0].Partition; actual != expected { return 0, fmt.Errorf("unexpected partition in the response (expected: %d, got: %d)", expected, actual) } if err := kerr.ErrorForCode(listRes.Topics[0].Partitions[0].ErrorCode); err != nil { @@ -280,14 +280,14 @@ func (p *Reader) fetchPartitionOffset(ctx context.Context, position int64) (int6 // processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored. // This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be // reached once this function successfully returns (only maxLag is guaranteed). -func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration, recordsChan chan<- []Record) error { - logger := log.With(p.logger, "target_lag", targetLag, "max_lag", maxLag) +func (r *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration, recordsChan chan<- []Record) error { + logger := log.With(r.logger, "target_lag", targetLag, "max_lag", maxLag) level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored") attempts := []func() (time.Duration, error){ // First process fetches until at least the max lag is honored. func() (time.Duration, error) { - return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) + return r.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) }, // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag @@ -300,13 +300,13 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) defer cancel() - return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since) + return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since) }, // If the target lag hasn't been reached with the previous attempt then we'll move on. However, // we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored. func() (time.Duration, error) { - return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) + return r.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) }, } @@ -324,7 +324,7 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex if currLag <= targetLag { level.Info(logger).Log( "msg", "partition reader consumed partition and current lag is lower or equal to configured target consumer lag", - "last_consumed_offset", p.committer.lastCommittedOffset, + "last_consumed_offset", r.committer.lastCommittedOffset, "current_lag", currLag, ) return nil @@ -333,13 +333,13 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex level.Warn(logger).Log( "msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag", - "last_consumed_offset", p.committer.lastCommittedOffset, + "last_consumed_offset", r.committer.lastCommittedOffset, "current_lag", currLag, ) return nil } -func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) { +func (r *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) { boff := backoff.New(ctx, backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: time.Second, @@ -349,20 +349,20 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t for boff.Ongoing() { // Send a direct request to the Kafka backend to fetch the partition start offset. - partitionStartOffset, err := p.fetchPartitionOffset(ctx, kafkaStartOffset) + partitionStartOffset, err := r.fetchPartitionOffset(ctx, kafkaStartOffset) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) boff.Wait() continue } - consumerGroupLastCommittedOffset := p.fetchLastCommittedOffset(ctx) + consumerGroupLastCommittedOffset := r.fetchLastCommittedOffset(ctx) // Send a direct request to the Kafka backend to fetch the last produced offset. // We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further // latency. lastProducedOffsetRequestedAt := time.Now() - lastProducedOffset, err := p.fetchPartitionOffset(ctx, kafkaEndOffset) + lastProducedOffset, err := r.fetchPartitionOffset(ctx, kafkaEndOffset) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) boff.Wait() @@ -387,19 +387,19 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t // This message is NOT expected to be logged with a very high rate. In this log we display the last measured // lag. If we don't have it (lag is zero value), then it will not be logged. - level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset) + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", r.lastProcessedOffset, "offset_lag", lastProducedOffset-r.lastProcessedOffset) for boff.Ongoing() { // Continue reading until we reached the desired offset. - if lastProducedOffset <= p.lastProcessedOffset { + if lastProducedOffset <= r.lastProcessedOffset { break } if time.Since(lastProducedOffsetRequestedAt) > time.Minute { - level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset) + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", r.lastProcessedOffset, "offset_lag", lastProducedOffset-r.lastProcessedOffset) } timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - recordsChan <- p.poll(timedCtx) + recordsChan <- r.poll(timedCtx) cancel() } if boff.Err() != nil { @@ -424,7 +424,7 @@ func loggerWithCurrentLagIfSet(logger log.Logger, currLag time.Duration) log.Log return log.With(logger, "current_lag", currLag) } -func (p *Reader) startFetchLoop(ctx context.Context) chan []Record { +func (r *Reader) startFetchLoop(ctx context.Context) chan []Record { records := make(chan []Record) go func() { for { @@ -432,7 +432,7 @@ func (p *Reader) startFetchLoop(ctx context.Context) chan []Record { case <-ctx.Done(): return default: - records <- p.poll(ctx) + records <- r.poll(ctx) } } }() @@ -440,7 +440,7 @@ func (p *Reader) startFetchLoop(ctx context.Context) chan []Record { } // logFetchErrors logs any errors encountered during the fetch operation. -func (p *Reader) logFetchErrors(fetches kgo.Fetches) { +func (r *Reader) logFetchErrors(fetches kgo.Fetches) { mErr := multierror.New() fetches.EachError(func(topic string, partition int32, err error) { if errors.Is(err, context.Canceled) { @@ -454,26 +454,26 @@ func (p *Reader) logFetchErrors(fetches kgo.Fetches) { if len(mErr) == 0 { return } - p.metrics.fetchesErrors.Add(float64(len(mErr))) - level.Error(p.logger).Log("msg", "encountered error while fetching", "err", mErr.Err()) + r.metrics.fetchesErrors.Add(float64(len(mErr))) + level.Error(r.logger).Log("msg", "encountered error while fetching", "err", mErr.Err()) } // pollFetches retrieves the next batch of records from Kafka and measures the fetch duration. -func (p *Reader) poll(ctx context.Context) []Record { +func (r *Reader) poll(ctx context.Context) []Record { defer func(start time.Time) { - p.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) + r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) - fetches := p.client.PollFetches(ctx) - p.recordFetchesMetrics(fetches) - p.logFetchErrors(fetches) + fetches := r.client.PollFetches(ctx) + r.recordFetchesMetrics(fetches) + r.logFetchErrors(fetches) fetches = filterOutErrFetches(fetches) if fetches.NumRecords() == 0 { return nil } records := make([]Record, 0, fetches.NumRecords()) fetches.EachRecord(func(rec *kgo.Record) { - if rec.Partition != p.partitionID { - level.Error(p.logger).Log("msg", "wrong partition record received", "partition", rec.Partition, "expected_partition", p.partitionID) + if rec.Partition != r.partitionID { + level.Error(r.logger).Log("msg", "wrong partition record received", "partition", rec.Partition, "expected_partition", r.partitionID) return } records = append(records, Record{ @@ -485,12 +485,12 @@ func (p *Reader) poll(ctx context.Context) []Record { Offset: rec.Offset, }) }) - p.lastProcessedOffset = records[len(records)-1].Offset + r.lastProcessedOffset = records[len(records)-1].Offset return records } // recordFetchesMetrics updates various metrics related to the fetch operation. -func (p *Reader) recordFetchesMetrics(fetches kgo.Fetches) { +func (r *Reader) recordFetchesMetrics(fetches kgo.Fetches) { var ( now = time.Now() numRecords = 0 @@ -498,15 +498,15 @@ func (p *Reader) recordFetchesMetrics(fetches kgo.Fetches) { fetches.EachRecord(func(record *kgo.Record) { numRecords++ delay := now.Sub(record.Timestamp).Seconds() - if p.Service.State() == services.Starting { - p.metrics.receiveDelay.WithLabelValues(phaseStarting).Observe(delay) + if r.Service.State() == services.Starting { + r.metrics.receiveDelay.WithLabelValues(phaseStarting).Observe(delay) } else { - p.metrics.receiveDelay.WithLabelValues(phaseRunning).Observe(delay) + r.metrics.receiveDelay.WithLabelValues(phaseRunning).Observe(delay) } }) - p.metrics.fetchesTotal.Add(float64(len(fetches))) - p.metrics.recordsPerFetch.Observe(float64(numRecords)) + r.metrics.fetchesTotal.Add(float64(len(fetches))) + r.metrics.recordsPerFetch.Observe(float64(numRecords)) } // filterOutErrFetches removes any fetches that resulted in errors from the provided slice. @@ -524,8 +524,8 @@ func filterOutErrFetches(fetches kgo.Fetches) kgo.Fetches { // isErrFetch checks if a given fetch resulted in any errors. func isErrFetch(fetch kgo.Fetch) bool { for _, t := range fetch.Topics { - for _, p := range t.Partitions { - if p.Err != nil { + for _, r := range t.Partitions { + if r.Err != nil { return true } }