diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index f8a1b518facb..6c8ed01c5c0c 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -839,17 +839,10 @@ kafka_config: # CLI flag: -kafka.producer-max-buffered-bytes [producer_max_buffered_bytes: | default = 1073741824] - # The best-effort maximum lag a consumer tries to achieve at startup. Set both - # -kafka.target-consumer-lag-at-startup and -kafka.max-consumer-lag-at-startup - # to 0 to disable waiting for maximum consumer lag being honored at startup. - # CLI flag: -kafka.target-consumer-lag-at-startup - [target_consumer_lag_at_startup: | default = 2s] - # The guaranteed maximum lag before a consumer is considered to have caught up # reading from a partition at startup, becomes ACTIVE in the hash ring and - # passes the readiness check. Set both -kafka.target-consumer-lag-at-startup - # and -kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum - # consumer lag being honored at startup. + # passes the readiness check. Set -kafka.max-consumer-lag-at-startup to 0 to + # disable waiting for maximum consumer lag being honored at startup. # CLI flag: -kafka.max-consumer-lag-at-startup [max_consumer_lag_at_startup: | default = 15s] diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 09008bec9341..aba0a5e2bbdd 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -31,8 +31,6 @@ const ( var ( ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") - ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") - ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) ) @@ -59,8 +57,7 @@ type Config struct { ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"` - TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"` - MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` + MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -88,8 +85,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.") f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") - consumerLagUsage := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".target-consumer-lag-at-startup", prefix+".max-consumer-lag-at-startup") - f.DurationVar(&cfg.TargetConsumerLagAtStartup, prefix+".target-consumer-lag-at-startup", 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+consumerLagUsage) + consumerLagUsage := fmt.Sprintf("Set -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".max-consumer-lag-at-startup") f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+consumerLagUsage) } @@ -103,13 +99,6 @@ func (cfg *Config) Validate() error { if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit { return ErrInvalidProducerMaxRecordSizeBytes } - if (cfg.TargetConsumerLagAtStartup == 0 && cfg.MaxConsumerLagAtStartup != 0) || (cfg.TargetConsumerLagAtStartup != 0 && cfg.MaxConsumerLagAtStartup == 0) { - return ErrInconsistentConsumerLagAtStartup - } - if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup { - return ErrInvalidMaxConsumerLagAtStartup - } - if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") { return ErrInconsistentSASLUsernameAndPassword } diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index 6cd1578da27c..0c65807f4966 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -10,15 +10,12 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/services" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/loki/v3/pkg/kafka" ) -var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded") - const ( kafkaStartOffset = -2 kafkaEndOffset = -1 @@ -65,7 +62,6 @@ type ReaderService struct { } type ReaderConfig struct { - TargetConsumerLagAtStartup time.Duration MaxConsumerLagAtStartup time.Duration ConsumerGroupOffsetCommitFreq time.Duration } @@ -92,7 +88,6 @@ func NewReaderService( } return newReaderService( ReaderConfig{ - TargetConsumerLagAtStartup: kafkaCfg.TargetConsumerLagAtStartup, MaxConsumerLagAtStartup: kafkaCfg.MaxConsumerLagAtStartup, ConsumerGroupOffsetCommitFreq: kafkaCfg.ConsumerGroupOffsetCommitInterval, }, @@ -151,33 +146,8 @@ func (s *ReaderService) starting(ctx context.Context) error { level.Debug(s.logger).Log("msg", "consuming from offset", "offset", consumeOffset) s.reader.SetOffsetForConsumption(consumeOffset) - return s.processConsumerLag(ctx) -} - -func (s *ReaderService) processConsumerLag(ctx context.Context) error { - targetLag := s.cfg.TargetConsumerLagAtStartup - maxLag := s.cfg.MaxConsumerLagAtStartup - - if targetLag > 0 && maxLag > 0 { - consumer, err := s.consumerFactory(s.committer) - if err != nil { - return fmt.Errorf("failed to create consumer: %w", err) - } - - cancelCtx, cancel := context.WithCancel(ctx) - recordsChan := make(chan []Record) - wait := consumer.Start(cancelCtx, recordsChan) - defer func() { - close(recordsChan) - cancel() - wait() - }() - - err = s.processNextFetchesUntilTargetOrMaxLagHonored(ctx, maxLag, targetLag, recordsChan) - if err != nil { - level.Error(s.logger).Log("msg", "failed to catch up to max lag", "err", err) - return err - } + if err = s.processConsumerLagAtStartup(ctx); err != nil { + return fmt.Errorf("failed to process consumer lag at startup: %w", err) } return nil @@ -202,89 +172,65 @@ func (s *ReaderService) running(ctx context.Context) error { return nil } -// processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored. -// This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be -// reached once this function successfully returns (only maxLag is guaranteed). -func (s *ReaderService) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration, recordsChan chan<- []Record) error { - logger := log.With(s.logger, "target_lag", targetLag, "max_lag", maxLag) - level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored") - - attempts := []func() (time.Duration, error){ - // First process fetches until at least the max lag is honored. - func() (time.Duration, error) { - return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) - }, - - // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag - // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). - // The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed - // from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously - // written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data - // written in the meanwhile. - func() (time.Duration, error) { - timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) - defer cancel() - - return s.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since) - }, +func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error { + if s.cfg.MaxConsumerLagAtStartup <= 0 { + level.Debug(s.logger).Log("msg", "processing consumer lag at startup is disabled") + return nil + } - // If the target lag hasn't been reached with the previous attempt then we'll move on. However, - // we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored. - func() (time.Duration, error) { - return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) - }, + consumer, err := s.consumerFactory(s.committer) + if err != nil { + return fmt.Errorf("failed to create consumer: %w", err) } - var currLag time.Duration - for _, attempt := range attempts { - var err error + cancelCtx, cancel := context.WithCancel(ctx) + recordsCh := make(chan []Record) + wait := consumer.Start(cancelCtx, recordsCh) + defer func() { + close(recordsCh) + cancel() + wait() + }() - currLag, err = attempt() - if errors.Is(err, errWaitTargetLagDeadlineExceeded) { - continue - } - if err != nil { - return err - } - if currLag <= targetLag { - level.Info(logger).Log( - "msg", "partition reader consumed partition and current lag is lower or equal to configured target consumer lag", - "last_consumed_offset", s.committer.lastCommittedOffset, - "current_lag", currLag, - ) - return nil - } + level.Debug(s.logger).Log("msg", "processing consumer lag at startup") + _, err = s.fetchUntilLagSatisfied(ctx, s.cfg.MaxConsumerLagAtStartup, s.logger, recordsCh, time.Since) + if err != nil { + level.Error(s.logger).Log("msg", "failed to catch up", "err", err) + return err } + level.Debug(s.logger).Log("msg", "processing consumer lag at startup finished") - level.Warn(logger).Log( - "msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag", - "last_consumed_offset", s.committer.lastCommittedOffset, - "current_lag", currLag, - ) return nil } -func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) { - boff := backoff.New(ctx, backoff.Config{ +func (s *ReaderService) fetchUntilLagSatisfied( + ctx context.Context, + targetLag time.Duration, + logger log.Logger, + recordsCh chan<- []Record, + timeSince func(time.Time) time.Duration, +) (time.Duration, error) { + b := backoff.New(ctx, backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: time.Second, - MaxRetries: 0, // Retry forever (unless context is canceled / deadline exceeded). + // Retry forever (unless context is canceled / deadline exceeded). + MaxRetries: 0, }) - currLag := time.Duration(0) + currentLag := time.Duration(0) - for boff.Ongoing() { + for b.Ongoing() { // Send a direct request to the Kafka backend to fetch the partition start offset. partitionStartOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaStartOffset) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) - boff.Wait() + b.Wait() continue } consumerGroupLastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch last committed offset", "err", err) - boff.Wait() + b.Wait() continue } @@ -295,17 +241,22 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m lastProducedOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaEndOffset) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) - boff.Wait() + b.Wait() continue } - lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset. + + // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset. + lastProducedOffset = lastProducedOffset - 1 level.Debug(logger).Log( - "msg", "fetched latest offset information", - "partition_start_offset", partitionStartOffset, - "last_produced_offset", lastProducedOffset, - "last_committed_offset", consumerGroupLastCommittedOffset, - ) + "msg", + "fetched latest offset information", + "partition_start_offset", + partitionStartOffset, + "last_produced_offset", + lastProducedOffset, + "last_committed_offset", + consumerGroupLastCommittedOffset) // Ensure there are some records to consume. For example, if the partition has been inactive for a long // time and all its records have been deleted, the partition start offset may be > 0 but there are no @@ -322,15 +273,15 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m // This message is NOT expected to be logged with a very high rate. In this log we display the last measured // lag. If we don't have it (lag is zero value), then it will not be logged. - level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset) + level.Info(loggerWithCurrentLagIfSet(logger, currentLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset) - for boff.Ongoing() { + for b.Ongoing() { // Continue reading until we reached the desired offset. if lastProducedOffset <= s.lastProcessedOffset { break } if time.Since(lastProducedOffsetRequestedAt) > time.Minute { - level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset) + level.Info(loggerWithCurrentLagIfSet(logger, currentLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset) } timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second) @@ -342,23 +293,23 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m continue } if len(records) > 0 { - recordsChan <- records + recordsCh <- records s.lastProcessedOffset = records[len(records)-1].Offset } } - if boff.Err() != nil { - return 0, boff.ErrCause() + if b.Err() != nil { + return 0, b.ErrCause() } // If it took less than the max desired lag to replay the partition // then we can stop here, otherwise we'll have to redo it. - if currLag = timeSince(lastProducedOffsetRequestedAt); currLag <= maxLag { - return currLag, nil + if currentLag = timeSince(lastProducedOffsetRequestedAt); currentLag <= targetLag { + return currentLag, nil } } - return 0, boff.ErrCause() + return 0, b.ErrCause() } func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record { @@ -399,10 +350,10 @@ func (s *serviceMetrics) reportRunning() { s.phase.WithLabelValues(phaseRunning).Set(1) } -func loggerWithCurrentLagIfSet(logger log.Logger, currLag time.Duration) log.Logger { - if currLag <= 0 { +func loggerWithCurrentLagIfSet(logger log.Logger, currentLag time.Duration) log.Logger { + if currentLag <= 0 { return logger } - return log.With(logger, "current_lag", currLag) + return log.With(logger, "current_lag", currentLag) } diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index dc4b49f0523d..ab7f270a8024 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -168,7 +168,6 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) { producer.ProduceSync(context.Background(), records...) // Enable the catch up logic so starting the reader will read any existing records. - kafkaCfg.TargetConsumerLagAtStartup = time.Second * 1 kafkaCfg.MaxConsumerLagAtStartup = time.Second * 2 err = services.StartAndAwaitRunning(context.Background(), partitionReader) @@ -246,7 +245,7 @@ func TestPartitionReader_ProcessCommits(t *testing.T) { return targetLag - 1 } - _, err = readerSvc.processNextFetchesUntilLagHonored(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince) + _, err = readerSvc.fetchUntilLagSatisfied(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince) assert.NoError(t, err) // Wait to process all the records