diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index f70c1a338148..51f105c812a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -509,6 +509,10 @@ public void abortRecordAppends() { recordsBuilder.abort(); } + public String topic() { + return topicPartition.topic(); + } + public boolean isClosed() { return recordsBuilder.isClosed(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ec50dc9bc26d..55615b1ddafa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -93,6 +93,8 @@ public class RecordAccumulator { private final TransactionManager transactionManager; private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire. + Map fakeCompression = Map.of("test", Compression.gzip().build(), "topic", Compression.lz4().build()); + /** * Create a new record accumulator * @@ -296,6 +298,12 @@ public RecordAppendResult append(String topic, Cluster cluster) throws InterruptedException { TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize))); + + Compression newCompression = compression; + if (fakeCompression.containsKey(topic)) { + newCompression = fakeCompression.get(topic); + } + // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); @@ -345,7 +353,7 @@ public RecordAppendResult append(String topic, if (buffer == null) { byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers)); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, newCompression.type(), key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock); // This call may block if we exhausted buffer space. buffer = free.allocate(size, maxTimeToBlock); @@ -408,7 +416,7 @@ private RecordAppendResult appendNewBatch(String topic, return appendResult; } - MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic()); + MemoryRecordsBuilder recordsBuilder = recordsBuilder(topic, buffer, apiVersions.maxUsableProduceMagic()); ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callbacks, nowMs)); @@ -419,12 +427,17 @@ private RecordAppendResult appendNewBatch(String topic, return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes()); } - private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) { + private MemoryRecordsBuilder recordsBuilder(String topic, ByteBuffer buffer, byte maxUsableMagic) { if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) { throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + "support the required message format (v2). The broker must be version 0.11 or later."); } - return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L); + Compression newCompression = compression; + if (fakeCompression.containsKey(topic)) { + newCompression = fakeCompression.get(topic); + } + + return MemoryRecords.builder(buffer, maxUsableMagic, newCompression, TimestampType.CREATE_TIME, 0L); } /** @@ -534,7 +547,13 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever // is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure // the split doesn't happen too often. - CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(), + + Compression nweCompression = compression; + if (fakeCompression.containsKey(bigBatch.topicPartition.topic())) { + nweCompression = fakeCompression.get(bigBatch.topicPartition.topic()); + } + + CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), nweCompression.type(), Math.max(1.0f, (float) bigBatch.compressionRatio())); Deque dq = bigBatch.split(this.batchSize); int numSplitBatches = dq.size(); @@ -625,10 +644,12 @@ private void insertInSequenceOrder(Deque deque, ProducerBatch bat private long batchReady(boolean exhausted, TopicPartition part, Node leader, long waitedTimeMs, boolean backingOff, int backoffAttempts, boolean full, long nextReadyCheckDelayMs, Set readyNodes) { + log.debug("------" + readyNodes); if (!readyNodes.contains(leader) && !isMuted(part)) { long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs; boolean expired = waitedTimeMs >= timeToWaitMs; boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); + log.debug("------========" + full + expired + exhausted + closed + flushesInProgress + transactionCompleting); boolean sendable = full || expired || exhausted @@ -645,6 +666,7 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } + log.debug("------" + readyNodes); return nextReadyCheckDelayMs; } @@ -726,6 +748,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin backoffAttempts = batch.attempts(); dequeSize = deque.size(); full = dequeSize > 1 || batch.isFull(); + // Q: why is full? + boolean k = dequeSize > 1; + log.debug("dequeSize > 1" + k); + log.debug("batch.isFull()" + batch.isFull()); } if (leader == null) { @@ -748,9 +774,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin --queueSizesIndex; } } - + log.debug("zzzzz" + readyNodes.toString()); nextReadyCheckDelayMs = batchReady(exhausted, part, leader, waitedTimeMs, backingOff, backoffAttempts, full, nextReadyCheckDelayMs, readyNodes); + log.debug("222zzzzz" + readyNodes.toString()); } } @@ -784,6 +811,7 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin */ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { Set readyNodes = new HashSet<>(); + log.debug("kkkkk " + readyNodes); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set unknownLeaderTopics = new HashSet<>(); // Go topic by topic so that we can get queue sizes for partitions in a topic and calculate @@ -792,6 +820,7 @@ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { final String topic = topicInfoEntry.getKey(); nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics); } + log.debug("2222kkkkk " + readyNodes); return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index b37b1f1ca685..6d02698ef74b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -822,6 +822,8 @@ private int estimatedBytesWritten() { return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes; } else { // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes + // Why estimatedCompressionRatio is not set automaticlly? + System.err.println("estimatedCompressionRatio " + estimatedCompressionRatio); return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR); } } diff --git a/clients/src/test/resources/log4j.properties b/clients/src/test/resources/log4j.properties index 0992580eca1d..1e918e25c06a 100644 --- a/clients/src/test/resources/log4j.properties +++ b/clients/src/test/resources/log4j.properties @@ -19,5 +19,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.org.apache.kafka=ERROR +log4j.logger.org.apache.kafka.clients.producer.internals=DEBUG # We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index be853d9d990b..013072d63ae2 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -232,7 +232,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } - protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType): Unit = { + protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType, topic: String = topic): Unit = { val partition = 0 val baseTimestamp = 123456L diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index c1ba3a1b83c4..0a4f17f6956c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -94,6 +94,17 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSendDifferentCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = { + val producer = createProducer( + compressionType = "gzip", + lingerMs = Int.MaxValue, + deliveryTimeoutMs = Int.MaxValue) + sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) + sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testSendNonCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = { diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index aec130ad9153..f884862cc4e6 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -25,15 +25,23 @@ import org.apache.kafka.clients.admin.DeleteTopicsOptions; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -48,10 +56,13 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.api.ClusterConfig; +import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTemplate; +import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestExtensions; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.Exit; @@ -63,6 +74,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -74,6 +86,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1340,6 +1353,62 @@ public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(Cluster adminClient.close(); } + @ClusterTest( + serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + } + ) + public void testDiffCompressionProdcueSend(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + clusterInstance.createTopic("test", 1, (short) 1); + clusterInstance.createTopic("topic", 1, (short) 1); + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5"); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + Producer producer = clusterInstance.producer(producerProps); + Future a = producer.send(new ProducerRecord<>("test", "lll")); + Future b = producer.send(new ProducerRecord<>("topic", "lll")); + Future c = producer.send(new ProducerRecord<>("test", "kkkk")); + producer.close(); + + TopicPartition tp1 = new TopicPartition("test", 0); + TopicPartition tp2 = new TopicPartition("topic", 0); + + try (Admin admin = clusterInstance.admin()) { + ListTopicsResult res = admin.listTopics(); + System.err.println("ssss " + res.listings().get()); + ListOffsetsResult lis = admin.listOffsets(Map.of(tp1, OffsetSpec.latest(), tp2, OffsetSpec.latest())); + System.err.println("jjjjj " + lis.all().get()); + } + + HashMap consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Consumer consumer = clusterInstance.consumer(consumerProps)) { + consumer.assign(List.of(tp1, tp2)); + TestUtils.waitForCondition( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(5)); + for (ConsumerRecord record : records) { + System.out.printf( + "JJJj Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n", + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value() + ); + } + return records.count() != 0; + }, 60000L, "fuck error" + ); + } + } + @ClusterTemplate("generate") public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception { try (Admin adminClient = clusterInstance.admin(); @@ -1464,6 +1533,7 @@ private KafkaProducer createProducer(ClusterInstance clusterInst Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "5"); return new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()); }