Skip to content

Commit

Permalink
update try finally block
Browse files Browse the repository at this point in the history
  • Loading branch information
m1a2st committed Nov 5, 2024
1 parent e34799f commit ee51494
Show file tree
Hide file tree
Showing 8 changed files with 1,437 additions and 1,368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,53 @@ class MetadataVersionIntegrationTest {
))
def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.admin()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
try {
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())

// Update to new version
val updateVersion = MetadataVersion.IBP_3_5_IV1.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
// Update to new version
val updateVersion = MetadataVersion.IBP_3_5_IV1.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()

// Verify that new version is visible on broker
TestUtils.waitUntilTrue(() => {
val describeResult2 = admin.describeFeatures()
val ff2 = describeResult2.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
ff2.minVersionLevel() == updateVersion && ff2.maxVersionLevel() == updateVersion
}, "Never saw metadata.version increase on broker")
admin.close()
// Verify that new version is visible on broker
TestUtils.waitUntilTrue(() => {
val describeResult2 = admin.describeFeatures()
val ff2 = describeResult2.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
ff2.minVersionLevel() == updateVersion && ff2.maxVersionLevel() == updateVersion
}, "Never saw metadata.version increase on broker")
} finally {
admin.close()
}
}

@ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0)
def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.admin()
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
admin.close()
try {
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
} finally {
admin.close()
}
}

@ClusterTest(types = Array(Type.KRAFT))
def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.admin()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), MetadataVersion.latestTesting().featureLevel(),
"If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
assertEquals(ff.maxVersionLevel(), MetadataVersion.latestTesting().featureLevel())
admin.close()
try {
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), MetadataVersion.latestTesting().featureLevel(),
"If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
assertEquals(ff.maxVersionLevel(), MetadataVersion.latestTesting().featureLevel())
} finally {
admin.close()
}
}
}
39 changes: 21 additions & 18 deletions core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,27 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
val userName = "user"

val admin = cluster.admin()
val results = admin.alterUserScramCredentials(util.Arrays.asList(
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
results.all.get

val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)

verifyDescribeEntityQuotas(entity, Map.empty)

alterEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
), validateOnly = false)

verifyDescribeEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
))
admin.close()
try {
val results = admin.alterUserScramCredentials(util.Arrays.asList(
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
results.all.get

val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)

verifyDescribeEntityQuotas(entity, Map.empty)

alterEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
), validateOnly = false)

verifyDescribeEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
))
} finally {
admin.close()
}
}

@ClusterTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,130 +84,133 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
createOffsetsTopic()

val admin = cluster.admin()
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
try {
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)

val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)

// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = consumerGroupHeartbeat(
groupId = "grp-1",
memberId = Uuid.randomUuid().toString,
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("bar"),
topicPartitions = List.empty
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")
// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = consumerGroupHeartbeat(
groupId = "grp-1",
memberId = Uuid.randomUuid().toString,
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("bar"),
topicPartitions = List.empty
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")

// Add second group with two members. For the first member, we
// wait until it receives an assignment. We use 'range` in this
// case to validate the assignor selection logic.
var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp2Member1Response = consumerGroupHeartbeat(
memberId = "member-1",
// Add second group with two members. For the first member, we
// wait until it receives an assignment. We use 'range` in this
// case to validate the assignor selection logic.
var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp2Member1Response = consumerGroupHeartbeat(
memberId = "member-1",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty
}, msg = s"Could not join the group successfully. Last response $grp2Member1Response.")

val grp2Member2Response = consumerGroupHeartbeat(
memberId = "member-2",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty
}, msg = s"Could not join the group successfully. Last response $grp2Member1Response.")

val grp2Member2Response = consumerGroupHeartbeat(
memberId = "member-2",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
.setGroupState(ConsumerGroupState.RECONCILING.toString)
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
.setMemberEpoch(grp2Member2Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment())
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
).asJava)),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1, 2).asJava)
).asJava))
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
).asJava)),
).asJava),
)

for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
.setGroupState(ConsumerGroupState.RECONCILING.toString)
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
.setMemberEpoch(grp2Member2Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment())
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
).asJava)),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1, 2).asJava)
).asJava))
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
).asJava)),
).asJava),
)

val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
includeAuthorizedOperations = true,
version = version.toShort,
)
val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
includeAuthorizedOperations = true,
version = version.toShort,
)

assertEquals(expected, actual)
assertEquals(expected, actual)
}
} finally {
admin.close()
}
admin.close()
}
}
Loading

0 comments on commit ee51494

Please sign in to comment.