Skip to content

Commit

Permalink
moves partition client creation into reader from readersvc + reintegr…
Browse files Browse the repository at this point in the history
…ates blockbuilder
  • Loading branch information
owen-d committed Nov 22, 2024
1 parent 6a951c0 commit 6c7233b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 26 deletions.
12 changes: 6 additions & 6 deletions pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func TestPartitionCommitter(t *testing.T) {
require.NoError(t, err)

// Verify metrics
assert.Equal(t, float64(1), testutil.ToFloat64(committer.metrics.CommitRequestsTotal))
assert.Equal(t, float64(0), testutil.ToFloat64(committer.metrics.CommitFailuresTotal))
assert.Equal(t, float64(testOffset), testutil.ToFloat64(committer.metrics.LastCommittedOffset))
assert.Equal(t, float64(1), testutil.ToFloat64(committer.commitRequestsTotal))
assert.Equal(t, float64(0), testutil.ToFloat64(committer.commitFailuresTotal))
assert.Equal(t, float64(testOffset), testutil.ToFloat64(committer.lastCommittedOffset))

// Verify committed offset
offsets, err := admClient.FetchOffsets(context.Background(), consumerGroup)
Expand All @@ -72,9 +72,9 @@ func TestPartitionCommitter(t *testing.T) {
require.NoError(t, err)

// Verify updated metrics
assert.Equal(t, float64(2), testutil.ToFloat64(committer.metrics.CommitRequestsTotal))
assert.Equal(t, float64(0), testutil.ToFloat64(committer.metrics.CommitFailuresTotal))
assert.Equal(t, float64(newTestOffset), testutil.ToFloat64(committer.metrics.LastCommittedOffset))
assert.Equal(t, float64(2), testutil.ToFloat64(committer.commitRequestsTotal))
assert.Equal(t, float64(0), testutil.ToFloat64(committer.commitFailuresTotal))
assert.Equal(t, float64(newTestOffset), testutil.ToFloat64(committer.lastCommittedOffset))

// Verify updated committed offset
offsets, err = admClient.FetchOffsets(context.Background(), consumerGroup)
Expand Down
32 changes: 32 additions & 0 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/grafana/loki/v3/pkg/kafka/client"
)

type SpecialOffset int
Expand Down Expand Up @@ -97,6 +100,35 @@ type Reader struct {
logger log.Logger
}

func NewReader(
cfg kafka.Config,
partitionID int32,
instanceID string,
logger log.Logger,
reg prometheus.Registerer,
) (*Reader, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
cfg,
clientMetrics,
log.With(logger, "component", "kafka-client"),
)
if err != nil {
return nil, fmt.Errorf("creating kafka client: %w", err)
}

// Create the reader
return newReader(
c,
cfg.Topic,
partitionID,
cfg.GetConsumerGroup(instanceID, partitionID),
logger,
reg,
), nil
}

// NewReader creates a new Reader instance
func newReader(
client *kgo.Client,
Expand Down
22 changes: 7 additions & 15 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
)

var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded")
Expand Down Expand Up @@ -81,27 +80,20 @@ func NewReaderService(
logger log.Logger,
reg prometheus.Registerer,
) (*ReaderService, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
kafkaCfg,
clientMetrics,
log.With(logger, "component", "kafka-client"),
)
if err != nil {
return nil, fmt.Errorf("creating kafka client: %w", err)
}

// Create the reader
reader := newReader(
c,
kafkaCfg.Topic,
reader, err := NewReader(
kafkaCfg,
partitionID,
kafkaCfg.GetConsumerGroup(instanceID, partitionID),
instanceID,
logger,
reg,
)

if err != nil {
return nil, errors.Wrap(err, "creating kafka reader")
}

return newReaderServiceFromIfc(
ReaderConfig{
TargetConsumerLagAtStartup: kafkaCfg.TargetConsumerLagAtStartup,
Expand Down
17 changes: 12 additions & 5 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/indexgateway"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
Expand Down Expand Up @@ -1790,9 +1791,8 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, fmt.Errorf("calculating block builder partition ID: %w", err)
}

reader, err := blockbuilder.NewPartitionReader(
reader, err := partition.NewReader(
t.Cfg.KafkaConfig,
t.Cfg.BlockBuilder.Backoff,
ingestPartitionID,
id,
logger,
Expand All @@ -1802,6 +1802,15 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, err
}

controller, err := blockbuilder.NewPartitionJobController(
reader,
t.Cfg.BlockBuilder.Backoff,
)

if err != nil {
return nil, err
}

objectStore, err := blockbuilder.NewMultiStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
return nil, err
Expand All @@ -1815,9 +1824,7 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
objectStore,
logger,
prometheus.DefaultRegisterer,
blockbuilder.NewPartitionJobController(
reader,
),
controller,
)

if err != nil {
Expand Down

0 comments on commit 6c7233b

Please sign in to comment.