Skip to content

Commit

Permalink
KAFKA-18392: Ensure client sets member ID for share group (#18649)
Browse files Browse the repository at this point in the history
Reviewers: Apoorv Mittal <[email protected]>, Lianet Magrans <[email protected]>
  • Loading branch information
AndrewJSchofield authored Jan 22, 2025
1 parent 239708f commit 9da516b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id." },
"about": "The member id generated by the consumer. The member id must be kept during the entire lifetime of the consumer process." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch; 0 to join the group; -1 to leave the group." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,7 @@ ConsumerGroup consumerGroup(
* created if it does not exist.
*
* @return A ConsumerGroup.
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
* if the group is not a consumer group.
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false.
* @throws IllegalStateException if the group does not have the expected type.
* Package private for testing.
*/
Expand Down Expand Up @@ -846,28 +845,28 @@ private ShareGroup getOrMaybeCreateShareGroup(

if (group == null) {
return new ShareGroup(snapshotRegistry, groupId);
} else {
if (group.type() == SHARE) {
return (ShareGroup) group;
} else {
// We don't support upgrading/downgrading between protocols at the moment so
// we throw an exception if a group exists with the wrong type.
throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId));
}
}

if (group.type() != SHARE) {
// We don't support upgrading/downgrading between protocols at the moment so
// we throw an exception if a group exists with the wrong type.
throw new GroupIdNotFoundException(String.format("Group %s is not a share group.",
groupId));
}

return (ShareGroup) group;
}

/**
* Gets or maybe creates a share group.
* The method should be called on the replay path.
* Gets or maybe creates a share group and updates the groups map if a new group is created.
*
* @param groupId The group id.
* @param createIfNotExists A boolean indicating whether the group should be
* created if it does not exist.
*
* @return A ShareGroup.
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
* if the group is not a consumer group.
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false.
* @throws IllegalStateException if the group does not have the expected type.
*
* Package private for testing.
*/
Expand All @@ -878,22 +877,22 @@ ShareGroup getOrMaybeCreatePersistedShareGroup(
Group group = groups.get(groupId);

if (group == null && !createIfNotExists) {
throw new IllegalStateException(String.format("Share group %s not found.", groupId));
throw new GroupIdNotFoundException(String.format("Share group %s not found.", groupId));
}

if (group == null) {
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, groupId);
groups.put(groupId, shareGroup);
return shareGroup;
} else {
if (group.type() == SHARE) {
return (ShareGroup) group;
} else {
// We don't support upgrading/downgrading between protocols at the moment so
// we throw an exception if a group exists with the wrong type.
throw new IllegalStateException(String.format("Group %s is not a share group.", groupId));
}
}

if (group.type() != SHARE) {
// We don't support upgrading/downgrading between protocols at the moment so
// we throw an exception if a group exists with the wrong type.
throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId));
}

return (ShareGroup) group;
}

/**
Expand Down Expand Up @@ -2032,11 +2031,10 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh

// Get or create the share group.
boolean createIfNotExists = memberEpoch == 0;
final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, createIfNotExists);
final ShareGroup group = getOrMaybeCreateShareGroup(groupId, createIfNotExists);
throwIfShareGroupIsFull(group, memberId);

// Get or create the member.
if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
group,
memberId,
Expand Down Expand Up @@ -2143,9 +2141,12 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
.setHeartbeatIntervalMs(shareGroupHeartbeatIntervalMs(groupId));

// The assignment is only provided in the following cases:
// 1. The member just joined or rejoined to group (epoch equals to zero);
// 1. The member sent a full request. It does so when joining or rejoining the group with zero
// as the member epoch; or on any errors (e.g. timeout). We use all the non-optional fields
// (subscribedTopicNames) to detect a full request as those must be set in a full request.
// 2. The member's assignment has been updated.
if (memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) {
boolean isFullRequest = subscribedTopicNames != null;
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14537,7 +14537,7 @@ public void testShareGroupUnknownGroupId() {
.withShareGroupAssignor(assignor)
.build();

assertThrows(IllegalStateException.class, () ->
assertThrows(GroupIdNotFoundException.class, () ->
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
Expand Down

0 comments on commit 9da516b

Please sign in to comment.