diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index aec130ad91536..80cda6d6e0bf3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -31,9 +31,14 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -48,10 +53,12 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.api.ClusterConfig; import org.apache.kafka.common.test.api.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTemplate; +import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestExtensions; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.Exit; @@ -63,6 +70,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -74,6 +82,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1340,6 +1349,43 @@ public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(Cluster adminClient.close(); } + @ClusterTest + public void testDiffCompressionProdcueSend(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + clusterInstance.createTopic("test", 1, (short) 1); + clusterInstance.createTopic("topic", 1, (short) 1); + KafkaProducer producer = createProducer(clusterInstance); + Future a = producer.send(new ProducerRecord<>("test", "lll")); + Future b = producer.send(new ProducerRecord<>("topic", "lll")); + Future c = producer.send(new ProducerRecord<>("test", "kkkk")); + + a.get(); + b.get(); + c.get(); + producer.close(); + + + HashMap consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Consumer consumer = clusterInstance.consumer(consumerProps)) { + consumer.subscribe(List.of("test", "topic")); + ConsumerRecords records = consumer.poll(Duration.ofMillis(5)); + assertTrue(records.count() != 0); + for (ConsumerRecord record : records) { + System.out.printf( + "JJJj Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n", + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value() + ); + } + } + } + @ClusterTemplate("generate") public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception { try (Admin adminClient = clusterInstance.admin(); @@ -1464,6 +1510,7 @@ private KafkaProducer createProducer(ClusterInstance clusterInst Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5"); return new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()); }