From 8ba25d2146be8f45b6dce66e23260adbd5c24762 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 23 Nov 2024 05:58:42 +0000 Subject: [PATCH] test --- .../producer/internals/ProducerBatch.java | 4 +++ .../producer/internals/RecordAccumulator.java | 29 +++++++++++++++---- .../kafka/api/BaseProducerSendTest.scala | 2 +- .../kafka/api/PlaintextProducerSendTest.scala | 11 +++++++ 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index f70c1a338148..51f105c812a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -509,6 +509,10 @@ public void abortRecordAppends() { recordsBuilder.abort(); } + public String topic() { + return topicPartition.topic(); + } + public boolean isClosed() { return recordsBuilder.isClosed(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ec50dc9bc26d..cd29f64d5ee6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -93,6 +93,8 @@ public class RecordAccumulator { private final TransactionManager transactionManager; private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire. + Map fakeCompression = Map.of("test", Compression.gzip().build(), "topic", Compression.lz4().build()); + /** * Create a new record accumulator * @@ -296,6 +298,12 @@ public RecordAppendResult append(String topic, Cluster cluster) throws InterruptedException { TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize))); + + Compression newCompression = compression; + if (fakeCompression.containsKey(topic)) { + newCompression = fakeCompression.get(topic); + } + // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); @@ -345,7 +353,7 @@ public RecordAppendResult append(String topic, if (buffer == null) { byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers)); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, newCompression.type(), key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock); // This call may block if we exhausted buffer space. buffer = free.allocate(size, maxTimeToBlock); @@ -408,7 +416,7 @@ private RecordAppendResult appendNewBatch(String topic, return appendResult; } - MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic()); + MemoryRecordsBuilder recordsBuilder = recordsBuilder(topic, buffer, apiVersions.maxUsableProduceMagic()); ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callbacks, nowMs)); @@ -419,12 +427,17 @@ private RecordAppendResult appendNewBatch(String topic, return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes()); } - private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) { + private MemoryRecordsBuilder recordsBuilder(String topic, ByteBuffer buffer, byte maxUsableMagic) { if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) { throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + "support the required message format (v2). The broker must be version 0.11 or later."); } - return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L); + Compression newCompression = compression; + if (fakeCompression.containsKey(topic)) { + newCompression = fakeCompression.get(topic); + } + + return MemoryRecords.builder(buffer, maxUsableMagic, newCompression, TimestampType.CREATE_TIME, 0L); } /** @@ -534,7 +547,13 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever // is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure // the split doesn't happen too often. - CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(), + + Compression nweCompression = compression; + if (fakeCompression.containsKey(bigBatch.topicPartition.topic())) { + nweCompression = fakeCompression.get(bigBatch.topicPartition.topic()); + } + + CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), nweCompression.type(), Math.max(1.0f, (float) bigBatch.compressionRatio())); Deque dq = bigBatch.split(this.batchSize); int numSplitBatches = dq.size(); diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index be853d9d990b..013072d63ae2 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -232,7 +232,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } - protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType): Unit = { + protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType, topic: String = topic): Unit = { val partition = 0 val baseTimestamp = 123456L diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index c1ba3a1b83c4..0a4f17f6956c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -94,6 +94,17 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSendDifferentCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = { + val producer = createProducer( + compressionType = "gzip", + lingerMs = Int.MaxValue, + deliveryTimeoutMs = Int.MaxValue) + sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) + sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testSendNonCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = {