From 9da516b1a911c24bb3575a5d2bddebe1b3268c7f Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 22 Jan 2025 08:57:40 +0000 Subject: [PATCH] KAFKA-18392: Ensure client sets member ID for share group (#18649) Reviewers: Apoorv Mittal , Lianet Magrans --- .../message/ShareGroupHeartbeatRequest.json | 2 +- .../group/GroupMetadataManager.java | 55 ++++++++++--------- .../group/GroupMetadataManagerTest.java | 2 +- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json index 523150a92476c..e49e022d45864 100644 --- a/clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json @@ -28,7 +28,7 @@ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", - "about": "The member id." }, + "about": "The member id generated by the consumer. The member id must be kept during the entire lifetime of the consumer process." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch; 0 to join the group; -1 to leave the group." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 2d5fd1e3288ca..e1e151d53d3f9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -728,8 +728,7 @@ ConsumerGroup consumerGroup( * created if it does not exist. * * @return A ConsumerGroup. - * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or - * if the group is not a consumer group. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false. * @throws IllegalStateException if the group does not have the expected type. * Package private for testing. */ @@ -846,28 +845,28 @@ private ShareGroup getOrMaybeCreateShareGroup( if (group == null) { return new ShareGroup(snapshotRegistry, groupId); + } else { + if (group.type() == SHARE) { + return (ShareGroup) group; + } else { + // We don't support upgrading/downgrading between protocols at the moment so + // we throw an exception if a group exists with the wrong type. + throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId)); + } } - - if (group.type() != SHARE) { - // We don't support upgrading/downgrading between protocols at the moment so - // we throw an exception if a group exists with the wrong type. - throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", - groupId)); - } - - return (ShareGroup) group; } /** - * Gets or maybe creates a share group. + * The method should be called on the replay path. + * Gets or maybe creates a share group and updates the groups map if a new group is created. * * @param groupId The group id. * @param createIfNotExists A boolean indicating whether the group should be * created if it does not exist. * * @return A ShareGroup. - * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or - * if the group is not a consumer group. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false. + * @throws IllegalStateException if the group does not have the expected type. * * Package private for testing. */ @@ -878,22 +877,22 @@ ShareGroup getOrMaybeCreatePersistedShareGroup( Group group = groups.get(groupId); if (group == null && !createIfNotExists) { - throw new IllegalStateException(String.format("Share group %s not found.", groupId)); + throw new GroupIdNotFoundException(String.format("Share group %s not found.", groupId)); } if (group == null) { ShareGroup shareGroup = new ShareGroup(snapshotRegistry, groupId); groups.put(groupId, shareGroup); return shareGroup; + } else { + if (group.type() == SHARE) { + return (ShareGroup) group; + } else { + // We don't support upgrading/downgrading between protocols at the moment so + // we throw an exception if a group exists with the wrong type. + throw new IllegalStateException(String.format("Group %s is not a share group.", groupId)); + } } - - if (group.type() != SHARE) { - // We don't support upgrading/downgrading between protocols at the moment so - // we throw an exception if a group exists with the wrong type. - throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId)); - } - - return (ShareGroup) group; } /** @@ -2032,11 +2031,10 @@ private CoordinatorResult sh // Get or create the share group. boolean createIfNotExists = memberEpoch == 0; - final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, createIfNotExists); + final ShareGroup group = getOrMaybeCreateShareGroup(groupId, createIfNotExists); throwIfShareGroupIsFull(group, memberId); // Get or create the member. - if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); ShareGroupMember member = getOrMaybeSubscribeShareGroupMember( group, memberId, @@ -2143,9 +2141,12 @@ private CoordinatorResult sh .setHeartbeatIntervalMs(shareGroupHeartbeatIntervalMs(groupId)); // The assignment is only provided in the following cases: - // 1. The member just joined or rejoined to group (epoch equals to zero); + // 1. The member sent a full request. It does so when joining or rejoining the group with zero + // as the member epoch; or on any errors (e.g. timeout). We use all the non-optional fields + // (subscribedTopicNames) to detect a full request as those must be set in a full request. // 2. The member's assignment has been updated. - if (memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { + boolean isFullRequest = subscribedTopicNames != null; + if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createShareGroupResponseAssignment(updatedMember)); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 017efb3e0079a..4e4e7cf7645cb 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -14537,7 +14537,7 @@ public void testShareGroupUnknownGroupId() { .withShareGroupAssignor(assignor) .build(); - assertThrows(IllegalStateException.class, () -> + assertThrows(GroupIdNotFoundException.class, () -> context.shareGroupHeartbeat( new ShareGroupHeartbeatRequestData() .setGroupId(groupId)