From c2235699bf98eedd58dafa06cac70cbdec8c99be Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 23 Nov 2024 06:57:38 +0000 Subject: [PATCH] test different compression improve previous test; --- .../apache/kafka/tools/TopicCommandTest.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index aec130ad91536..0dd1ce8783db8 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -31,9 +31,15 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -48,10 +54,12 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.api.ClusterConfig; import org.apache.kafka.common.test.api.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTemplate; +import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestExtensions; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.Exit; @@ -63,6 +71,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -74,6 +83,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1340,6 +1350,50 @@ public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(Cluster adminClient.close(); } + @ClusterTest + public void testDiffCompressionProdcueSend(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + clusterInstance.waitForReadyBrokers(); + clusterInstance.createTopic("test", 1, (short) 1); + clusterInstance.createTopic("topic", 1, (short) 1); + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5"); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + Producer producer = clusterInstance.producer(producerProps); + Future a = producer.send(new ProducerRecord<>("test", "lll")); + Future b = producer.send(new ProducerRecord<>("topic", "lll")); + Future c = producer.send(new ProducerRecord<>("test", "kkkk")); + producer.close(); + + + HashMap consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Consumer consumer = clusterInstance.consumer(consumerProps)) { + consumer.subscribe(List.of("test", "topic")); + TestUtils.waitForCondition( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(5)); + for (ConsumerRecord record : records) { + System.out.printf( + "JJJj Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n", + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value() + ); + } + return records.count() != 0; + }, 60000L, "fuck error" + ); + } + } + @ClusterTemplate("generate") public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception { try (Admin adminClient = clusterInstance.admin(); @@ -1464,6 +1518,7 @@ private KafkaProducer createProducer(ClusterInstance clusterInst Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5"); return new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()); }