Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove ifc from interface names #15072

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ type PartitionController interface {
// containing log data and "committed" is the consumer group
type PartitionJobController struct {
stepLen int64
part partition.ReaderIfc
part partition.Reader
backoff backoff.Config
decoder *kafka.Decoder
}

func NewPartitionJobController(
controller partition.ReaderIfc,
controller partition.Reader,
backoff backoff.Config,
) (*PartitionJobController, error) {
decoder, err := kafka.NewDecoder()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kafka/partition/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type partitionCommitter struct {
lastCommittedOffset prometheus.Gauge

logger log.Logger
reader ReaderIfc
reader Reader
commitFreq time.Duration

toCommit *atomic.Int64
wg sync.WaitGroup
cancel context.CancelFunc
}

func newCommitter(reader ReaderIfc, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
func newCommitter(reader Reader, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
reader: reader,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestPartitionCommitter(t *testing.T) {
reg := prometheus.NewRegistry()
partitionID := int32(1)
consumerGroup := "test-consumer-group"
reader := newReader(
reader := newStdReader(
client,
kafkaCfg.Topic,
partitionID,
Expand Down
36 changes: 18 additions & 18 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Record struct {
Offset int64
}

type ReaderIfc interface {
type Reader interface {
Topic() string
Partition() int32
ConsumerGroup() string
Expand Down Expand Up @@ -91,8 +91,8 @@ func newReaderMetrics(r prometheus.Registerer) *readerMetrics {
}
}

// Reader provides low-level access to Kafka partition reading operations
type Reader struct {
// StdReader provides low-level access to Kafka partition reading operations
type StdReader struct {
client *kgo.Client
topic string
partitionID int32
Expand All @@ -101,13 +101,13 @@ type Reader struct {
logger log.Logger
}

func NewReader(
func NewStdReader(
cfg kafka.Config,
partitionID int32,
instanceID string,
logger log.Logger,
reg prometheus.Registerer,
) (*Reader, error) {
) (*StdReader, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
Expand All @@ -120,7 +120,7 @@ func NewReader(
}

// Create the reader
return newReader(
return newStdReader(
c,
cfg.Topic,
partitionID,
Expand All @@ -130,16 +130,16 @@ func NewReader(
), nil
}

// NewReader creates a new Reader instance
func newReader(
// newStdReader creates a new StdReader instance
func newStdReader(
client *kgo.Client,
topic string,
partitionID int32,
consumerGroup string,
logger log.Logger,
reg prometheus.Registerer,
) *Reader {
return &Reader{
) *StdReader {
return &StdReader{
client: client,
topic: topic,
partitionID: partitionID,
Expand All @@ -150,22 +150,22 @@ func newReader(
}

// Topic returns the topic being read
func (r *Reader) Topic() string {
func (r *StdReader) Topic() string {
return r.topic
}

// Partition returns the partition being read
func (r *Reader) Partition() int32 {
func (r *StdReader) Partition() int32 {
return r.partitionID
}

// ConsumerGroup returns the consumer group
func (r *Reader) ConsumerGroup() string {
func (r *StdReader) ConsumerGroup() string {
return r.consumerGroup
}

// FetchLastCommittedOffset retrieves the last committed offset for this partition
func (r *Reader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
func (r *StdReader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
req := kmsg.NewPtrOffsetFetchRequest()
req.Topics = []kmsg.OffsetFetchRequestTopic{{
Topic: r.topic,
Expand Down Expand Up @@ -210,7 +210,7 @@ func (r *Reader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
}

// FetchPartitionOffset retrieves the offset for a specific position
func (r *Reader) FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) {
func (r *StdReader) FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) {
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
partitionReq.Partition = r.partitionID
partitionReq.Timestamp = int64(position)
Expand Down Expand Up @@ -257,7 +257,7 @@ func (r *Reader) FetchPartitionOffset(ctx context.Context, position SpecialOffse
}

// Poll retrieves the next batch of records from Kafka
func (r *Reader) Poll(ctx context.Context) ([]Record, error) {
func (r *StdReader) Poll(ctx context.Context) ([]Record, error) {
start := time.Now()
fetches := r.client.PollFetches(ctx)
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -303,14 +303,14 @@ func (r *Reader) Poll(ctx context.Context) ([]Record, error) {
return records, nil
}

func (r *Reader) SetOffsetForConsumption(offset int64) {
func (r *StdReader) SetOffsetForConsumption(offset int64) {
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.topic: {r.partitionID: kgo.NewOffset().At(offset)},
})
}

// Commit commits an offset to the consumer group
func (r *Reader) Commit(ctx context.Context, offset int64) error {
func (r *StdReader) Commit(ctx context.Context, offset int64) error {
admin := kadm.NewClient(r.client)

// Commit the last consumed offset.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ReaderService struct {
services.Service

cfg ReaderConfig
reader ReaderIfc
reader Reader
consumerFactory ConsumerFactory
logger log.Logger
metrics *serviceMetrics
Expand All @@ -82,7 +82,7 @@ func NewReaderService(
) (*ReaderService, error) {

// Create the reader
reader, err := NewReader(
reader, err := NewStdReader(
kafkaCfg,
partitionID,
instanceID,
Expand All @@ -109,7 +109,7 @@ func NewReaderService(

func newReaderServiceFromIfc(
cfg ReaderConfig,
reader ReaderIfc,
reader Reader,
consumerFactory ConsumerFactory,
logger log.Logger,
reg prometheus.Registerer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func readersFromKafkaCfg(
kafkaCfg kafka.Config,
consumerFactory ConsumerFactory,
partition int32,
) (ReaderIfc, *ReaderService) {
) (Reader, *ReaderService) {
partitionReader, err := NewReaderService(
kafkaCfg,
partition,
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,7 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, fmt.Errorf("calculating block builder partition ID: %w", err)
}

reader, err := partition.NewReader(
reader, err := partition.NewStdReader(
t.Cfg.KafkaConfig,
ingestPartitionID,
id,
Expand Down
Loading