Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test #14

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open

test #14

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ public void abortRecordAppends() {
recordsBuilder.abort();
}

public String topic() {
return topicPartition.topic();
}

public boolean isClosed() {
return recordsBuilder.isClosed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class RecordAccumulator {
private final TransactionManager transactionManager;
private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.

Map<String, Compression> fakeCompression = Map.of("test", Compression.gzip().build(), "topic", Compression.lz4().build());

/**
* Create a new record accumulator
*
Expand Down Expand Up @@ -296,6 +298,12 @@ public RecordAppendResult append(String topic,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));


Compression newCompression = compression;
if (fakeCompression.containsKey(topic)) {
newCompression = fakeCompression.get(topic);
}

// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
Expand Down Expand Up @@ -345,7 +353,7 @@ public RecordAppendResult append(String topic,

if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, newCompression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
Expand Down Expand Up @@ -408,7 +416,7 @@ private RecordAppendResult appendNewBatch(String topic,
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
MemoryRecordsBuilder recordsBuilder = recordsBuilder(topic, buffer, apiVersions.maxUsableProduceMagic());
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));
Expand All @@ -419,12 +427,17 @@ private RecordAppendResult appendNewBatch(String topic,
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
private MemoryRecordsBuilder recordsBuilder(String topic, ByteBuffer buffer, byte maxUsableMagic) {
if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
Compression newCompression = compression;
if (fakeCompression.containsKey(topic)) {
newCompression = fakeCompression.get(topic);
}

return MemoryRecords.builder(buffer, maxUsableMagic, newCompression, TimestampType.CREATE_TIME, 0L);
}

/**
Expand Down Expand Up @@ -534,7 +547,13 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(),

Compression nweCompression = compression;
if (fakeCompression.containsKey(bigBatch.topicPartition.topic())) {
nweCompression = fakeCompression.get(bigBatch.topicPartition.topic());
}

CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), nweCompression.type(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Expand Down Expand Up @@ -625,10 +644,12 @@ private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch bat
private long batchReady(boolean exhausted, TopicPartition part, Node leader,
long waitedTimeMs, boolean backingOff, int backoffAttempts,
boolean full, long nextReadyCheckDelayMs, Set<Node> readyNodes) {
log.debug("------" + readyNodes);
if (!readyNodes.contains(leader) && !isMuted(part)) {
long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs;
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
log.debug("------========" + full + expired + exhausted + closed + flushesInProgress + transactionCompleting);
boolean sendable = full
|| expired
|| exhausted
Expand All @@ -645,6 +666,7 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader,
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
log.debug("------" + readyNodes);
return nextReadyCheckDelayMs;
}

Expand Down Expand Up @@ -726,6 +748,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
backoffAttempts = batch.attempts();
dequeSize = deque.size();
full = dequeSize > 1 || batch.isFull();
// Q: why is full?
boolean k = dequeSize > 1;
log.debug("dequeSize > 1" + k);
log.debug("batch.isFull()" + batch.isFull());
}

if (leader == null) {
Expand All @@ -748,9 +774,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
--queueSizesIndex;
}
}

log.debug("zzzzz" + readyNodes.toString());
nextReadyCheckDelayMs = batchReady(exhausted, part, leader, waitedTimeMs, backingOff,
backoffAttempts, full, nextReadyCheckDelayMs, readyNodes);
log.debug("222zzzzz" + readyNodes.toString());
}
}

Expand Down Expand Up @@ -784,6 +811,7 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
*/
public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
log.debug("kkkkk " + readyNodes);
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// Go topic by topic so that we can get queue sizes for partitions in a topic and calculate
Expand All @@ -792,6 +820,7 @@ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
final String topic = topicInfoEntry.getKey();
nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
}
log.debug("2222kkkkk " + readyNodes);
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,8 @@ private int estimatedBytesWritten() {
return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
} else {
// estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
// Why estimatedCompressionRatio is not set automaticlly?
System.err.println("estimatedCompressionRatio " + estimatedCompressionRatio);
return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
}
}
Expand Down
1 change: 1 addition & 0 deletions clients/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.kafka.clients.producer.internals=DEBUG
# We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper
log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}

protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType): Unit = {
protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType, topic: String = topic): Unit = {
val partition = 0

val baseTimestamp = 123456L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSendDifferentCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = {
val producer = createProducer(
compressionType = "gzip",
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSendNonCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = {
Expand Down
70 changes: 70 additions & 0 deletions tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,23 @@
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
Expand All @@ -48,10 +56,13 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Exit;
Expand All @@ -63,6 +74,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -74,6 +86,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -1340,6 +1353,62 @@ public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(Cluster
adminClient.close();
}

@ClusterTest(
serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
}
)
public void testDiffCompressionProdcueSend(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
clusterInstance.createTopic("test", 1, (short) 1);
clusterInstance.createTopic("topic", 1, (short) 1);
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = clusterInstance.producer(producerProps);
Future<RecordMetadata> a = producer.send(new ProducerRecord<>("test", "lll"));
Future<RecordMetadata> b = producer.send(new ProducerRecord<>("topic", "lll"));
Future<RecordMetadata> c = producer.send(new ProducerRecord<>("test", "kkkk"));
producer.close();

TopicPartition tp1 = new TopicPartition("test", 0);
TopicPartition tp2 = new TopicPartition("topic", 0);

try (Admin admin = clusterInstance.admin()) {
ListTopicsResult res = admin.listTopics();
System.err.println("ssss " + res.listings().get());
ListOffsetsResult lis = admin.listOffsets(Map.of(tp1, OffsetSpec.latest(), tp2, OffsetSpec.latest()));
System.err.println("jjjjj " + lis.all().get());
}

HashMap<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

try (Consumer<String, String> consumer = clusterInstance.consumer(consumerProps)) {
consumer.assign(List.of(tp1, tp2));
TestUtils.waitForCondition(
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"JJJj Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
return records.count() != 0;
}, 60000L, "fuck error"
);
}
}

@ClusterTemplate("generate")
public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.admin();
Expand Down Expand Up @@ -1464,6 +1533,7 @@ private KafkaProducer<String, String> createProducer(ClusterInstance clusterInst
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");
return new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer());
}

Expand Down
Loading