Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Nov 18, 2024
1 parent 237eaa6 commit 5813b62
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 13 deletions.
10 changes: 2 additions & 8 deletions pkg/kafka/partition/reader_refactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const (
)

type ReaderIfc interface {
Client() *kgo.Client
Topic() string
Partition() int32
ConsumerGroup() string
Expand All @@ -48,7 +47,7 @@ type refactoredReaderMetrics struct {
lastCommittedOffset prometheus.Gauge
}

func newRefactoredReaderMetrics(r prometheus.Registerer, partitionID int32) *refactoredReaderMetrics {
func newRefactoredReaderMetrics(r prometheus.Registerer) *refactoredReaderMetrics {
return &refactoredReaderMetrics{
fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_kafka_reader_fetch_wait_duration_seconds",
Expand Down Expand Up @@ -104,16 +103,11 @@ func NewRefactoredReader(
topic: topic,
partitionID: partitionID,
consumerGroup: consumerGroup,
metrics: newRefactoredReaderMetrics(reg, partitionID),
metrics: newRefactoredReaderMetrics(reg),
logger: logger,
}
}

// Client returns the underlying Kafka client
func (r *RefactoredReader) Client() *kgo.Client {
return r.client
}

// Topic returns the topic being read
func (r *RefactoredReader) Topic() string {
return r.topic
Expand Down
8 changes: 3 additions & 5 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
Expand Down Expand Up @@ -65,14 +64,13 @@ func readersFromKafkaCfg(
consumerFactory ConsumerFactory,
partition int32,
) (*RefactoredReader, *ReaderService) {
partitionID := int32(0)
c, err := client.NewReaderClient(kafkaCfg, kprom.NewMetrics("foo"), log.NewNopLogger())
c, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
require.NoError(t, err)
r := NewRefactoredReader(
c,
kafkaCfg.Topic,
partitionID,
kafkaCfg.GetConsumerGroup("test-consumer-group", partitionID),
partition,
kafkaCfg.GetConsumerGroup("test-consumer-group", partition),
log.NewNopLogger(),
nil,
)
Expand Down

0 comments on commit 5813b62

Please sign in to comment.