-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18538: Add Streams membership manager #18551
base: trunk
Are you sure you want to change the base?
Conversation
c58c298
to
67dd512
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a first pass on the membership manager, but this is one massive PR. I am not done yet. But a more general point: In the original PR, we discussed changing what we have to change in the membership manager and then trying to share some code with the AbstractMembershipManager
. Is that something we have abandoned / is it too hard to do? This class is a complex beast, I was hoping we can share the state machine, and just plug a different reconciliation logic.
If we are not going to share the code, I think we have to go through the last months of changes on AbstractMembershipManager
and see which of the changes apply to our membership manager as well. It looks like a number of changes were made in the state machine since we have forked the membership manager - we need to check whether they need to be adopted by our membership manager.
private static final int PARTITION_0 = 0; | ||
private static final int PARTITION_1 = 1; | ||
|
||
private Time time = new MockTime(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private static final int PARTITION_1 = 1; | ||
|
||
private Time time = new MockTime(0); | ||
private Metrics metrics = new Metrics(time); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
|
||
public void onHeartbeatFailure(boolean retriable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not unit tested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
|
||
public void maybeRejoinStaleMember() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not unit tested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
rejoinedWhileReconciliationInProgress = false; | ||
} | ||
|
||
public void onTasksRevokedCallbackCompleted(final StreamsOnTasksRevokedCallbackCompletedEvent event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not unit tested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
public boolean isLeavingGroup() { | ||
MemberState state = state(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we'd use the getter here. Maybe a result of copying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Me neither, I copied this over. I will change it.
} | ||
|
||
public boolean shouldHeartbeatNow() { | ||
MemberState state = state(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use the getter
} | ||
|
||
public void onHeartbeatRequestGenerated() { | ||
MemberState state = state(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use the getter
private static final String TOPIC_0 = "topic-0"; | ||
private static final String TOPIC_1 = "topic-1"; | ||
|
||
private static final int PARTITION_0 = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you asked me to call these tasks instead of partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You gave good reasons not to do it under my comment, so I actually thought you refrained from changing it.
SortedSet<StreamsRebalanceData.TaskId> taskIdSet = new TreeSet<>(); | ||
for (final Map.Entry<String, SortedSet<Integer>> task : tasks.entrySet()) { | ||
final String subtopologyId = task.getKey(); | ||
final SortedSet<Integer> partitions = task.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you asked me to call these tasks instead of partitions
8403d02
to
6c65e3f
Compare
b0eaf13
to
5e6812f
Compare
@lucasbru I included your feedback. I added javadocs mainly on public methods. Please re-review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for PR @cadonna - I've made a pass over the StreamsMembershipManager
I'll do the test in another pass
} | ||
|
||
public void registerStateListener(MemberStateListener listener) { | ||
stateUpdatesListeners.add(Objects.requireNonNull(listener, "State updates listener cannot be null")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should stateUpdatesListeners
be a Set
or should we check for possibly adding a duplicate MemberStateListener
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not allowing duplicate listener makes sense to me. However, I would prefer to check for reference equality instead of using a Set
since the latter requires equals()
and hashCode()
being implemented which is not needed for listener IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check for duplicates.
stateUpdatesListeners.add(Objects.requireNonNull(listener, "State updates listener cannot be null")); | ||
} | ||
|
||
void notifyEpochChange(Optional<Integer> epoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the package-private modifier here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* the interval. | ||
*/ | ||
public boolean shouldHeartbeatNow() { | ||
return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why heartbeat on leaving? more for my own knowledge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a member leaves the group, it send a heartbeat with the leave epoch to the group coordinator. The leave epoch is -1 for non-static members and -2 for static members. So the group coordinator knows that the member can be safely removed from the group.
A leave group heartbeat request does not need to wait for the expiration of the heartbeat interval, but should be sent immediately.
*/ | ||
public void onFenced() { | ||
if (state == MemberState.PREPARE_LEAVING) { | ||
log.info("Member {} with epoch {} got fenced but it is already preparing to leave " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the other log statements in onFenced
are at the debug
level, is there reason this one is info
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! I actually do not know. I copied this over from the AbstractMembershipManager
without noticing this. I am going to change the level to debug
.
5e6812f
to
8dab352
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting there! I made a deeper pass on the main class. I did not look into the unit tests too closely, since Bill intended to do this.
* If the member is already part of the group, this will only ensure that the updated subscription | ||
* is included in the next heartbeat request. | ||
* <p/> | ||
* Note that list of topics of the subscription is taken from the shared subscription state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Note that list of topics of the subscription is taken from the shared subscription state. | |
* Note that the list of topics in the subscription is taken from the shared subscription state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* <p/> | ||
* Note that list of topics of the subscription is taken from the shared subscription state. | ||
*/ | ||
public void onSubscriptionUpdated() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does SubscriptionUpdated
mean in the context of Kafka Streams? It seems like we don't really subscribe to topics, so how does this relate to e.g. the topology?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we do subscribe:
kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Line 1046 in ddfadf7
mainConsumer.unsubscribe(); |
The async consumer passes the subscription back to the network thread by means of an event. Processing that event calls onSubscriptionUpdated()
.
In future, we could consider removing the explicit subscription and let the StreamsRebalanceData
or StreamsRebalanceEventsProcessor
subscribe depending on the topology.
* <ul> | ||
* <li>Keeping member state</li> | ||
* <li>Keeping assignment for the member</li> | ||
* <li>Reconciling assignment, for example if tasks need to be revoked before other tasks can be assigned</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <li>Reconciling assignment, for example if tasks need to be revoked before other tasks can be assigned</li> | |
* <li>Reconciling assignment, for example, if tasks need to be revoked before other tasks can be assigned</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private static final String MEMBER_ID = "test-member-1"; | ||
private static final int MEMBER_EPOCH = 1; | ||
|
||
private static final String SUB_TOPOLOGY_ID_0 = "subtopology-0"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the capitalization is subtopologyId
, then this should be SUBTOPOLOGY_ID
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right!
public void onHeartbeatSuccess(StreamsGroupHeartbeatResponse response) { | ||
StreamsGroupHeartbeatResponseData responseData = response.data(); | ||
throwIfUnexpectedError(responseData); | ||
if (state == MemberState.LEAVING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For discussion: Should we test that the onHeartbeatSuccess
is a NOOP in these three cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case with state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()
is not a noop, because it might complete the leaveGroupInProgress
future in maybeCompleteLeaveInProgress()
. That case is also covered by testLeaveGroupWhenInGroupWithAssignment()
.
For the other two cases, I need to see how I can sensibly verify a noop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified the noop by checking if the member state listener was NOT called. See testOnHeartbeatSuccessWhenInFenced()
for an example.
*/ | ||
private void transitionTo(MemberState nextState) { | ||
if (!state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) { | ||
throw new IllegalStateException(String.format("Invalid state transition from %s to %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we test detection of invalid state transitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand why this method is not part of the MemberState
class. Then it would also be easy to test the transitions.
* Notify when the poll timer expired. | ||
*/ | ||
public void onPollTimerExpired() { | ||
transitionToSendingLeaveGroup(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we test what happens in LEAVING
and UNSUBSCRIBED
state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean FATAL
and UNSUBSCRIBED
?
"current assignment {}. Nothing to reconcile.", | ||
targetAssignment, currentAssignment); | ||
// Make sure we transition the member back to STABLE if it was RECONCILING (ex. | ||
// member was RECONCILING unresolved assignments that were just removed by the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean for an assignment to be unresolved if we do not use UUIDs?
copy & paste error?
You may want to replace this comment by a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced the comment with a test.
It is not clear to me how a member can be in JOINING
with a reconciled assignment. When the member starts up and joins it does not a reconciled assignment with the assignment that comes in with onHeartbeatSuccess()
. If the member rejoins after being stale because it exceeded the poll time out the current task assignment is cleared. Thus, the assignment cannot be reconciled with the assignment that comes in with onHeartbeatSuccess()
.
public class StreamsMembershipManager implements RequestManager { | ||
|
||
/** | ||
* A data structure to represent the current task assignment, and current target task assignment of a member in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You renamed current target task assignment
to just target task assignment
, so then you should probably update this comment as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Override | ||
public NetworkClientDelegate.PollResult poll(long currentTimeMs) { | ||
if (state == MemberState.RECONCILING) { | ||
maybeReconcile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the reconciliation to poll
in the consumer membership manager back then, because we may receive new metadata for resolving topic UUIDs. Does it still make sense to reconcile in poll?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to postpone this broader discussion after merging this PR.
The Streams membership manager is used client-side in the background thread of the async consumer. For each member /consumer, it is responsible for: * keeping the member state, * keeping assignments for the member, * reconciling the assignments of the member -- for example when tasks need to be revoked before other tasks are assigned * requesting invocations of assignment and revocation callbacks by the stream thread. The Streams membership manager is called by the background thread of the async consumer, directly in its event loop and from the Streams group heartbeat request manager. The Streams membership manager uses the Streams rebalance events processor to request assignment/revocation callback in the stream thread.
295686b
to
5b2a196
Compare
The Streams membership manager is used client-side in the
background thread of the async consumer. For each member
/consumer, it is responsible for:
when tasks need to be revoked before other tasks are assigned
by the stream thread.
The Streams membership manager is called by the background thread of
the async consumer, directly in its event loop and from the
Streams group heartbeat request manager. The Streams membership
manager uses the Streams rebalance events processor to request
assignment/revocation callback in the stream thread.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)