Skip to content

Commit

Permalink
test different compression
Browse files Browse the repository at this point in the history
improve previous test;
  • Loading branch information
TaiJuWu committed Nov 23, 2024
1 parent 8ba25d2 commit 4b15386
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
Expand All @@ -48,10 +54,12 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Exit;
Expand All @@ -63,6 +71,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -74,6 +83,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -1340,6 +1350,49 @@ public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(Cluster
adminClient.close();
}

@ClusterTest
public void testDiffCompressionProdcueSend(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
clusterInstance.createTopic("test", 1, (short) 1);
clusterInstance.createTopic("topic", 1, (short) 1);
Map<String, Object> producerProps = new HashMap();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = clusterInstance.producer(producerProps);
Future<RecordMetadata> a = producer.send(new ProducerRecord<>("test", "lll"));
Future<RecordMetadata> b = producer.send(new ProducerRecord<>("topic", "lll"));
Future<RecordMetadata> c = producer.send(new ProducerRecord<>("test", "kkkk"));
producer.close();


HashMap<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

try (Consumer<String, String> consumer = clusterInstance.consumer(consumerProps)) {
consumer.subscribe(List.of("test", "topic"));
TestUtils.waitForCondition(
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"JJJj Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
return records.count() != 0;
}, 60000L, "fuck error"
);
}
}

@ClusterTemplate("generate")
public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.admin();
Expand Down Expand Up @@ -1464,6 +1517,7 @@ private KafkaProducer<String, String> createProducer(ClusterInstance clusterInst
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");
return new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer());
}

Expand Down

0 comments on commit 4b15386

Please sign in to comment.