Skip to content

Commit

Permalink
Merge pull request #1403 from openmeterio/feat/set-max-processing-time
Browse files Browse the repository at this point in the history
feat: set max processing time for sarama consumers
  • Loading branch information
turip authored Aug 21, 2024
2 parents 6c66767 + b7e54ef commit 11e0e6d
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions internal/watermill/driver/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
)

const (
// defaultMaxProcessinTime is the default maximum time a message is allowed to be processed before the
// partition assignment is lost by the consumer. For now we just set it to a high enough value (default 1s)
//
// Later we can make this configurable if needed.
defaultMaxProcessingTime = 5 * time.Minute
)

type SubscriberOptions struct {
Broker BrokerOptions
ConsumerGroupName string
Expand Down Expand Up @@ -37,6 +45,8 @@ func NewSubscriber(in SubscriberOptions) (message.Subscriber, error) {
return nil, err
}

saramaConfig.Consumer.MaxProcessingTime = defaultMaxProcessingTime

wmConfig := kafka.SubscriberConfig{
Brokers: []string{in.Broker.KafkaConfig.Broker},
OverwriteSaramaConfig: saramaConfig,
Expand Down

0 comments on commit 11e0e6d

Please sign in to comment.