Skip to content

Commit

Permalink
feat: remove target lag and keep just maximum lag (#15120)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Nov 27, 2024
1 parent 65eda52 commit 12386a2
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 135 deletions.
11 changes: 2 additions & 9 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -839,17 +839,10 @@ kafka_config:
# CLI flag: -kafka.producer-max-buffered-bytes
[producer_max_buffered_bytes: <int> | default = 1073741824]

# The best-effort maximum lag a consumer tries to achieve at startup. Set both
# -kafka.target-consumer-lag-at-startup and -kafka.max-consumer-lag-at-startup
# to 0 to disable waiting for maximum consumer lag being honored at startup.
# CLI flag: -kafka.target-consumer-lag-at-startup
[target_consumer_lag_at_startup: <duration> | default = 2s]

# The guaranteed maximum lag before a consumer is considered to have caught up
# reading from a partition at startup, becomes ACTIVE in the hash ring and
# passes the readiness check. Set both -kafka.target-consumer-lag-at-startup
# and -kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum
# consumer lag being honored at startup.
# passes the readiness check. Set -kafka.max-consumer-lag-at-startup to 0 to
# disable waiting for maximum consumer lag being honored at startup.
# CLI flag: -kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]

Expand Down
15 changes: 2 additions & 13 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ const (
var (
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set")
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
)
Expand All @@ -59,8 +57,7 @@ type Config struct {
ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`

TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -88,8 +85,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

consumerLagUsage := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".target-consumer-lag-at-startup", prefix+".max-consumer-lag-at-startup")
f.DurationVar(&cfg.TargetConsumerLagAtStartup, prefix+".target-consumer-lag-at-startup", 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+consumerLagUsage)
consumerLagUsage := fmt.Sprintf("Set -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".max-consumer-lag-at-startup")
f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+consumerLagUsage)
}

Expand All @@ -103,13 +99,6 @@ func (cfg *Config) Validate() error {
if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit {
return ErrInvalidProducerMaxRecordSizeBytes
}
if (cfg.TargetConsumerLagAtStartup == 0 && cfg.MaxConsumerLagAtStartup != 0) || (cfg.TargetConsumerLagAtStartup != 0 && cfg.MaxConsumerLagAtStartup == 0) {
return ErrInconsistentConsumerLagAtStartup
}
if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup {
return ErrInvalidMaxConsumerLagAtStartup
}

if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") {
return ErrInconsistentSASLUsernameAndPassword
}
Expand Down
173 changes: 62 additions & 111 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/kafka"
)

var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded")

const (
kafkaStartOffset = -2
kafkaEndOffset = -1
Expand Down Expand Up @@ -65,7 +62,6 @@ type ReaderService struct {
}

type ReaderConfig struct {
TargetConsumerLagAtStartup time.Duration
MaxConsumerLagAtStartup time.Duration
ConsumerGroupOffsetCommitFreq time.Duration
}
Expand All @@ -92,7 +88,6 @@ func NewReaderService(
}
return newReaderService(
ReaderConfig{
TargetConsumerLagAtStartup: kafkaCfg.TargetConsumerLagAtStartup,
MaxConsumerLagAtStartup: kafkaCfg.MaxConsumerLagAtStartup,
ConsumerGroupOffsetCommitFreq: kafkaCfg.ConsumerGroupOffsetCommitInterval,
},
Expand Down Expand Up @@ -151,33 +146,8 @@ func (s *ReaderService) starting(ctx context.Context) error {
level.Debug(s.logger).Log("msg", "consuming from offset", "offset", consumeOffset)
s.reader.SetOffsetForConsumption(consumeOffset)

return s.processConsumerLag(ctx)
}

func (s *ReaderService) processConsumerLag(ctx context.Context) error {
targetLag := s.cfg.TargetConsumerLagAtStartup
maxLag := s.cfg.MaxConsumerLagAtStartup

if targetLag > 0 && maxLag > 0 {
consumer, err := s.consumerFactory(s.committer)
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}

cancelCtx, cancel := context.WithCancel(ctx)
recordsChan := make(chan []Record)
wait := consumer.Start(cancelCtx, recordsChan)
defer func() {
close(recordsChan)
cancel()
wait()
}()

err = s.processNextFetchesUntilTargetOrMaxLagHonored(ctx, maxLag, targetLag, recordsChan)
if err != nil {
level.Error(s.logger).Log("msg", "failed to catch up to max lag", "err", err)
return err
}
if err = s.processConsumerLagAtStartup(ctx); err != nil {
return fmt.Errorf("failed to process consumer lag at startup: %w", err)
}

return nil
Expand All @@ -202,89 +172,65 @@ func (s *ReaderService) running(ctx context.Context) error {
return nil
}

// processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored.
// This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be
// reached once this function successfully returns (only maxLag is guaranteed).
func (s *ReaderService) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration, recordsChan chan<- []Record) error {
logger := log.With(s.logger, "target_lag", targetLag, "max_lag", maxLag)
level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored")

attempts := []func() (time.Duration, error){
// First process fetches until at least the max lag is honored.
func() (time.Duration, error) {
return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
},

// If the target lag hasn't been reached with the first attempt (which stops once at least the max lag
// is honored) then we try to reach the (lower) target lag within a fixed time (best-effort).
// The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed
// from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously
// written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data
// written in the meanwhile.
func() (time.Duration, error) {
timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded)
defer cancel()

return s.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since)
},
func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error {
if s.cfg.MaxConsumerLagAtStartup <= 0 {
level.Debug(s.logger).Log("msg", "processing consumer lag at startup is disabled")
return nil
}

// If the target lag hasn't been reached with the previous attempt then we'll move on. However,
// we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored.
func() (time.Duration, error) {
return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
},
consumer, err := s.consumerFactory(s.committer)
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}

var currLag time.Duration
for _, attempt := range attempts {
var err error
cancelCtx, cancel := context.WithCancel(ctx)
recordsCh := make(chan []Record)
wait := consumer.Start(cancelCtx, recordsCh)
defer func() {
close(recordsCh)
cancel()
wait()
}()

currLag, err = attempt()
if errors.Is(err, errWaitTargetLagDeadlineExceeded) {
continue
}
if err != nil {
return err
}
if currLag <= targetLag {
level.Info(logger).Log(
"msg", "partition reader consumed partition and current lag is lower or equal to configured target consumer lag",
"last_consumed_offset", s.committer.lastCommittedOffset,
"current_lag", currLag,
)
return nil
}
level.Debug(s.logger).Log("msg", "processing consumer lag at startup")
_, err = s.fetchUntilLagSatisfied(ctx, s.cfg.MaxConsumerLagAtStartup, s.logger, recordsCh, time.Since)
if err != nil {
level.Error(s.logger).Log("msg", "failed to catch up", "err", err)
return err
}
level.Debug(s.logger).Log("msg", "processing consumer lag at startup finished")

level.Warn(logger).Log(
"msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag",
"last_consumed_offset", s.committer.lastCommittedOffset,
"current_lag", currLag,
)
return nil
}

func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) {
boff := backoff.New(ctx, backoff.Config{
func (s *ReaderService) fetchUntilLagSatisfied(
ctx context.Context,
targetLag time.Duration,
logger log.Logger,
recordsCh chan<- []Record,
timeSince func(time.Time) time.Duration,
) (time.Duration, error) {
b := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
MaxRetries: 0, // Retry forever (unless context is canceled / deadline exceeded).
// Retry forever (unless context is canceled / deadline exceeded).
MaxRetries: 0,
})
currLag := time.Duration(0)
currentLag := time.Duration(0)

for boff.Ongoing() {
for b.Ongoing() {
// Send a direct request to the Kafka backend to fetch the partition start offset.
partitionStartOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaStartOffset)
if err != nil {
level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err)
boff.Wait()
b.Wait()
continue
}

consumerGroupLastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx)
if err != nil {
level.Warn(logger).Log("msg", "partition reader failed to fetch last committed offset", "err", err)
boff.Wait()
b.Wait()
continue
}

Expand All @@ -295,17 +241,22 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
lastProducedOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaEndOffset)
if err != nil {
level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err)
boff.Wait()
b.Wait()
continue
}
lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.

// Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.
lastProducedOffset = lastProducedOffset - 1

level.Debug(logger).Log(
"msg", "fetched latest offset information",
"partition_start_offset", partitionStartOffset,
"last_produced_offset", lastProducedOffset,
"last_committed_offset", consumerGroupLastCommittedOffset,
)
"msg",
"fetched latest offset information",
"partition_start_offset",
partitionStartOffset,
"last_produced_offset",
lastProducedOffset,
"last_committed_offset",
consumerGroupLastCommittedOffset)

// Ensure there are some records to consume. For example, if the partition has been inactive for a long
// time and all its records have been deleted, the partition start offset may be > 0 but there are no
Expand All @@ -322,15 +273,15 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m

// This message is NOT expected to be logged with a very high rate. In this log we display the last measured
// lag. If we don't have it (lag is zero value), then it will not be logged.
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)
level.Info(loggerWithCurrentLagIfSet(logger, currentLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)

for boff.Ongoing() {
for b.Ongoing() {
// Continue reading until we reached the desired offset.
if lastProducedOffset <= s.lastProcessedOffset {
break
}
if time.Since(lastProducedOffsetRequestedAt) > time.Minute {
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)
level.Info(loggerWithCurrentLagIfSet(logger, currentLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)
}

timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
Expand All @@ -342,23 +293,23 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
continue
}
if len(records) > 0 {
recordsChan <- records
recordsCh <- records
s.lastProcessedOffset = records[len(records)-1].Offset
}
}

if boff.Err() != nil {
return 0, boff.ErrCause()
if b.Err() != nil {
return 0, b.ErrCause()
}

// If it took less than the max desired lag to replay the partition
// then we can stop here, otherwise we'll have to redo it.
if currLag = timeSince(lastProducedOffsetRequestedAt); currLag <= maxLag {
return currLag, nil
if currentLag = timeSince(lastProducedOffsetRequestedAt); currentLag <= targetLag {
return currentLag, nil
}
}

return 0, boff.ErrCause()
return 0, b.ErrCause()
}

func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record {
Expand Down Expand Up @@ -399,10 +350,10 @@ func (s *serviceMetrics) reportRunning() {
s.phase.WithLabelValues(phaseRunning).Set(1)
}

func loggerWithCurrentLagIfSet(logger log.Logger, currLag time.Duration) log.Logger {
if currLag <= 0 {
func loggerWithCurrentLagIfSet(logger log.Logger, currentLag time.Duration) log.Logger {
if currentLag <= 0 {
return logger
}

return log.With(logger, "current_lag", currLag)
return log.With(logger, "current_lag", currentLag)
}
3 changes: 1 addition & 2 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
producer.ProduceSync(context.Background(), records...)

// Enable the catch up logic so starting the reader will read any existing records.
kafkaCfg.TargetConsumerLagAtStartup = time.Second * 1
kafkaCfg.MaxConsumerLagAtStartup = time.Second * 2

err = services.StartAndAwaitRunning(context.Background(), partitionReader)
Expand Down Expand Up @@ -246,7 +245,7 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
return targetLag - 1
}

_, err = readerSvc.processNextFetchesUntilLagHonored(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince)
_, err = readerSvc.fetchUntilLagSatisfied(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince)
assert.NoError(t, err)

// Wait to process all the records
Expand Down

0 comments on commit 12386a2

Please sign in to comment.