Skip to content

Commit

Permalink
refactor(kafka-reader): Separates kafka constructs for better separat…
Browse files Browse the repository at this point in the history
…ion of concerns & reuse. (#14982)
  • Loading branch information
owen-d authored Nov 22, 2024
1 parent 0c24a70 commit 62e7d61
Show file tree
Hide file tree
Showing 18 changed files with 714 additions and 2,271 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ require (
github.com/IBM/ibm-cos-sdk-go v1.11.1
github.com/axiomhq/hyperloglog v0.2.0
github.com/buger/jsonparser v1.1.1
github.com/coder/quartz v0.1.2
github.com/d4l3k/messagediff v1.2.1
github.com/dolthub/swiss v0.2.1
github.com/efficientgo/core v1.0.0-rc.3
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1101,8 +1101,6 @@ github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1Ig
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coder/quartz v0.1.2 h1:PVhc9sJimTdKd3VbygXtS4826EOCpB1fXoRlLnCrE+s=
github.com/coder/quartz v0.1.2/go.mod h1:vsiCc+AHViMKH2CQpGIpFgdHIEQsxwm8yCscqKmzbRA=
github.com/containerd/containerd v1.4.1/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/containerd/fifo v1.1.0 h1:4I2mbh5stb1u6ycIABlBw9zgtlK8viPI9QkQNRQEEmY=
github.com/containerd/fifo v1.1.0/go.mod h1:bmC4NWMbXlt2EZ0Hc7Fx7QzTFxgPID13eH0Qu+MAb2o=
Expand Down
11 changes: 9 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ type Ingester struct {

ingestPartitionID int32
partitionRingLifecycler *ring.PartitionInstanceLifecycler
partitionReader *partition.Reader
partitionReader *partition.ReaderService
}

// New makes a new Ingester.
Expand Down Expand Up @@ -380,7 +380,14 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
logger,
prometheus.WrapRegistererWithPrefix("loki_", registerer))

i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger, registerer), logger, registerer)
i.partitionReader, err = partition.NewReaderService(
cfg.KafkaIngestion.KafkaConfig,
i.ingestPartitionID,
cfg.LifecyclerConfig.ID,
NewKafkaConsumerFactory(i, logger, registerer),
logger,
registerer,
)
if err != nil {
return nil, err
}
Expand Down
110 changes: 46 additions & 64 deletions pkg/kafka/partition/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/kafka"
)

// Committer defines an interface for committing offsets
Expand All @@ -22,49 +19,40 @@ type Committer interface {
EnqueueOffset(offset int64)
}

// partitionCommitter is responsible for committing offsets for a specific Kafka partition
// to the Kafka broker. It also tracks metrics related to the commit process.
type partitionCommitter struct {
commitRequestsTotal prometheus.Counter
commitRequestsLatency prometheus.Histogram
commitFailuresTotal prometheus.Counter
lastCommittedOffset prometheus.Gauge

logger log.Logger
admClient *kadm.Client

kafkaCfg kafka.Config
partitionID int32
consumerGroup string
logger log.Logger
reader ReaderIfc
commitFreq time.Duration

toCommit *atomic.Int64
wg sync.WaitGroup
cancel context.CancelFunc
}

// newCommitter creates and initializes a new Committer.
// It sets up the necessary metrics and initializes the committer with the provided configuration.
func newCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
func newCommitter(reader ReaderIfc, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
kafkaCfg: kafkaCfg,
partitionID: partitionID,
consumerGroup: consumerGroup,
admClient: admClient,
logger: logger,
reader: reader,
commitFreq: commitFreq,
commitRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_offset_commit_requests_total",
Help: "Total number of requests issued to commit the last consumed offset (includes both successful and failed requests).",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
}),
commitFailuresTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_offset_commit_failures_total",
Help: "Total number of failed requests to commit the last consumed offset.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
}),
commitRequestsLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_offset_commit_request_duration_seconds",
Help: "The duration of requests to commit the last consumed offset.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: time.Hour,
Expand All @@ -73,15 +61,15 @@ func newCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int
lastCommittedOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_last_committed_offset",
Help: "The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
}),
toCommit: atomic.NewInt64(-1),
}

// Initialise the last committed offset metric to -1 to signal no offset has been committed yet (0 is a valid offset).
// Initialize the last committed offset metric to -1 to signal no offset has been committed yet
c.lastCommittedOffset.Set(-1)

if kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 {
if commitFreq > 0 {
c.wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
Expand All @@ -91,77 +79,71 @@ func newCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int
return c
}

func (r *partitionCommitter) autoCommitLoop(ctx context.Context) {
defer r.wg.Done()
commitTicker := time.NewTicker(r.kafkaCfg.ConsumerGroupOffsetCommitInterval)
func (c *partitionCommitter) autoCommitLoop(ctx context.Context) {
defer c.wg.Done()
commitTicker := time.NewTicker(c.commitFreq)
defer commitTicker.Stop()

previousOffset := r.toCommit.Load()
previousOffset := c.toCommit.Load()
for {
select {
case <-ctx.Done():
return
case <-commitTicker.C:
currOffset := r.toCommit.Load()
currOffset := c.toCommit.Load()
if currOffset == previousOffset {
continue
}

if err := r.Commit(ctx, currOffset); err == nil {
if err := c.Commit(ctx, currOffset); err == nil {
level.Error(c.logger).Log("msg", "failed to commit", "offset", currOffset)
c.lastCommittedOffset.Set(float64(currOffset))
previousOffset = currOffset
}
}
}
}

func (r *partitionCommitter) EnqueueOffset(o int64) {
if r.kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 {
r.toCommit.Store(o)
func (c *partitionCommitter) EnqueueOffset(o int64) {
if c.commitFreq > 0 {
c.toCommit.Store(o)
}
}

// commit attempts to commit the given offset to Kafka for the partition this committer is responsible for.
// It updates relevant metrics and logs the result of the commit operation.
func (r *partitionCommitter) Commit(ctx context.Context, offset int64) (returnErr error) {
func (c *partitionCommitter) Commit(ctx context.Context, offset int64) error {
startTime := time.Now()
r.commitRequestsTotal.Inc()
c.commitRequestsTotal.Inc()

defer func() {
r.commitRequestsLatency.Observe(time.Since(startTime).Seconds())

if returnErr != nil {
level.Error(r.logger).Log("msg", "failed to commit last consumed offset to Kafka", "err", returnErr, "offset", offset)
r.commitFailuresTotal.Inc()
}
}()

// Commit the last consumed offset.
toCommit := kadm.Offsets{}
toCommit.AddOffset(r.kafkaCfg.Topic, r.partitionID, offset, -1)
committed, err := r.admClient.CommitOffsets(ctx, r.consumerGroup, toCommit)
if err != nil {
if err := c.reader.Commit(ctx, offset); err != nil {
level.Error(c.logger).Log("msg", "failed to commit offset", "err", err, "offset", offset)
c.commitFailuresTotal.Inc()
c.commitRequestsLatency.Observe(time.Since(startTime).Seconds())
return err
} else if !committed.Ok() {
return committed.Error()
}

committedOffset, _ := committed.Lookup(r.kafkaCfg.Topic, r.partitionID)
level.Debug(r.logger).Log("msg", "last commit offset successfully committed to Kafka", "offset", committedOffset.At)
r.lastCommittedOffset.Set(float64(committedOffset.At))
level.Debug(c.logger).Log("msg", "successfully committed offset", "offset", offset)
c.lastCommittedOffset.Set(float64(offset))
c.commitRequestsLatency.Observe(time.Since(startTime).Seconds())
return nil
}

func (r *partitionCommitter) Stop() {
if r.kafkaCfg.ConsumerGroupOffsetCommitInterval <= 0 {
func (c *partitionCommitter) Stop() {
if c.commitFreq <= 0 {
return
}
r.cancel()
r.wg.Wait()
c.cancel()
c.wg.Wait()

offset := r.toCommit.Load()
offset := c.toCommit.Load()
if offset < 0 {
return
}
// Commit has internal timeouts, so this call shouldn't block for too long.
_ = r.Commit(context.Background(), offset)

// Commit has internal timeouts, so this call shouldn't block for too long
logger := log.With(c.logger, "msg", "stopping partition committer", "final_offset", offset)
if err := c.Commit(context.Background(), offset); err != nil {
level.Error(logger).Log("err", err)
} else {
level.Info(logger).Log()
}
}
10 changes: 9 additions & 1 deletion pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ func TestPartitionCommitter(t *testing.T) {
reg := prometheus.NewRegistry()
partitionID := int32(1)
consumerGroup := "test-consumer-group"
committer := newCommitter(kafkaCfg, admClient, partitionID, consumerGroup, logger, reg)
reader := newReader(
client,
kafkaCfg.Topic,
partitionID,
consumerGroup,
logger,
reg,
)
committer := newCommitter(reader, kafkaCfg.ConsumerGroupOffsetCommitInterval, logger, reg)

// Test committing an offset
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
79 changes: 0 additions & 79 deletions pkg/kafka/partition/metrics.go

This file was deleted.

Loading

0 comments on commit 62e7d61

Please sign in to comment.