Skip to content

Commit

Permalink
MINOR: Cleanup zk condition in TransactionsTest,
Browse files Browse the repository at this point in the history
PlaintextConsumerAssignorsTest, QuorumTestHarness
  • Loading branch information
frankvicky committed Jan 20, 2025
1 parent 71495a2 commit 94fa519
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ class PlaintextConsumerAssignorsTest extends AbstractConsumerTest {
// Only the classic group protocol supports client-side assignors
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.assignmentStrategy={2}")
@CsvSource(Array(
"zk, classic, org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
"zk, classic, org.apache.kafka.clients.consumer.RangeAssignor",
"kraft, classic, org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
"kraft, classic, org.apache.kafka.clients.consumer.RangeAssignor"
))
Expand Down
40 changes: 11 additions & 29 deletions core/src/test/scala/integration/kafka/api/TransactionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,9 @@ class TransactionsTest extends IntegrationTestHarness {
fail("Should not be able to send messages from a fenced producer.")
} catch {
case _: InvalidProducerEpochException =>
case e: ExecutionException => {
if (quorum == "zk") {
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
} else {
// In kraft mode, transactionV2 is used.
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
}
}
case e: ExecutionException =>
// In kraft mode, transactionV2 is used.
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
case e: Exception =>
throw new AssertionError("Got an unexpected exception from a fenced producer.", e)
}
Expand Down Expand Up @@ -622,27 +617,14 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)

if (quorum == "zk") {
// In zk mode, transaction v1 is used.
try {
// Now that the transaction has expired, the second send should fail with a ProducerFencedException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a ProducerFencedException since the transaction has expired")
} catch {
case _: ProducerFencedException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
}
} else {
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
} catch {
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
}
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
} catch {
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
}

// Verify that the first message was aborted and the second one was never written at all.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ abstract class QuorumTestHarness extends Logging {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0,$listeners")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, s"CONTROLLER,$listenerNames")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0")
// Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent.
props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000")
val config = new KafkaConfig(props)

Expand Down Expand Up @@ -369,7 +368,7 @@ object QuorumTestHarness {

/**
* Verify that a previous test that doesn't use QuorumTestHarness hasn't left behind an unexpected thread.
* This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
* This assumes that brokers, admin clients, producers and consumers are not created in another @BeforeClass,
* which is true for core tests where this harness is used.
*/
@BeforeAll
Expand Down Expand Up @@ -437,9 +436,6 @@ object QuorumTestHarness {
)
}

// The following is for tests that only work with the classic group protocol because of relying on Zookeeper
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))

// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
Expand Down

0 comments on commit 94fa519

Please sign in to comment.