Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(kafka-reader): Separates kafka constructs for better separation of concerns & reuse. #14982

Merged
merged 8 commits into from
Nov 22, 2024

Conversation

owen-d
Copy link
Member

@owen-d owen-d commented Nov 16, 2024

Refactor Kafka Reader and Partition Committer

This PR refactors the Kafka ingestion pipeline components to achieve better separation of concerns and cleaner interfaces. The changes split the existing Reader into three distinct components:

  1. A ReaderIfc that handles pure Kafka interactions
  2. A partitionCommitter that manages async offset commits (already existed, but I've refactored it to use the interface)
  3. A ReaderService that coordinates the overall lifecycle

Key Changes

Note: As of now, this just adds 3 new files with refactored versions -- if we're happy I'll replace the existing ones, but the current structure mirrors how I developed against the prior versions.

Disclaimer: I haven't hooked this up to testware yet; if we're happy with this direction, I'll do that next. edit: done.

New Reader Interface

Created a focused interface for Kafka operations:

type ReaderIfc interface {
    Topic() string
    Partition() int32
    ConsumerGroup() string
    FetchLastCommittedOffset(ctx context.Context) (int64, error)
    FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error)
    Poll(ctx context.Context) ([]Record, error)
    Commit(ctx context.Context, offset int64) error
    // Set the target offset for consumption. reads will begin from here.
    SetOffsetForConsumption(offset int64)
}

Improved Committer Design

  • Moved the committer to depend on ReaderIfc instead of containing Kafka logic
  • Maintains async commit functionality but delegates actual commits to the reader
  • Cleaner separation between offset management and Kafka operations
  • Added explicit SpecialOffset type to make partition offset fetching more type-safe

Service Lifecycle

  • ReaderService now coordinates between the reader and committer
  • Clearer separation of metrics between components
  • More explicit error handling throughout

Benefits

  • Maintainability: Responsibilities are clearly separated between Kafka operations, offset management, and service lifecycle
  • Flexibility: The ReaderIfc can be used independently of the service wrapper, which is how I started down this path. I'd like to reuse the underlying reader in the block-builder code.

@owen-d owen-d requested a review from a team as a code owner November 16, 2024 00:37
continue
}

if err := c.Commit(ctx, currOffset); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log the error when it's non-nil?

Copy link
Contributor

@benclive benclive left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for doing this! It is definitely an improvement. I've left a few comments to address but I'd be happy to approve once they're looked at

lastCommittedOffset = int64(KafkaStartOffset)
}

if lastCommittedOffset > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just spotted this now, but this should be >= as 0 is a valid offset

}
lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.

level.Debug(logger).Log("msg", "fetched latest offset information", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add lastCommittedOffset to this log line? We can then understand all the decisions made beyond this point as we frequently need to know what happened here.

// Ensure there are some records to consume. For example, if the partition has been inactive for a long
// time and all its records have been deleted, the partition start offset may be > 0 but there are no
// records to actually consume.
if partitionStartOffset > lastProducedOffset {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should also be >=. What happens if a partition is new and has had a single record written to it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this causes TestPartitionReader_ProcessCommits to fail. I'd like to leave this debugging for a future PR and let this PR simply refactor the existing structure, but not the logic.

return
}
// Commit has internal timeouts, so this call shouldn't block for too long
_ = c.Commit(context.Background(), offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to add an Info log here saying what the final committed offset was.
I know we have a debug line in Commit() but it is useful to know the final commit at shutdown, even if debug logging is not enabled.

"go.uber.org/atomic"
)

type refactoredPartitionCommitter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of "refactored..." as a name!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be removed when the old code is deleted? @owen-d

level.Debug(r.logger).Log(
"msg", "malformed response, setting to start offset",
)
return int64(KafkaStartOffset), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like the other failures cases in this method, can we also return error here and let the caller decide on the offset?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, but my goal here is to refactor the existing implementation, not change it's logic. This is a good followup PR though.

Name: "loki_ingest_storage_reader_phase",
Help: "The current phase of the consumer.",
}, []string{"phase"}),
receiveDelay: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used in the service, should we remove it?

Copy link
Contributor

@benclive benclive left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any issues with the refactor, and I'm happy to delay any logic / logging modifications to a future PR, so approving to unblock!

@owen-d owen-d merged commit 62e7d61 into grafana:main Nov 22, 2024
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants