Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Nov 23, 2024
1 parent fd9de50 commit 8ba25d2
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ public void abortRecordAppends() {
recordsBuilder.abort();
}

public String topic() {
return topicPartition.topic();
}

public boolean isClosed() {
return recordsBuilder.isClosed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class RecordAccumulator {
private final TransactionManager transactionManager;
private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.

Map<String, Compression> fakeCompression = Map.of("test", Compression.gzip().build(), "topic", Compression.lz4().build());

/**
* Create a new record accumulator
*
Expand Down Expand Up @@ -296,6 +298,12 @@ public RecordAppendResult append(String topic,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));


Compression newCompression = compression;
if (fakeCompression.containsKey(topic)) {
newCompression = fakeCompression.get(topic);
}

// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
Expand Down Expand Up @@ -345,7 +353,7 @@ public RecordAppendResult append(String topic,

if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, newCompression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
Expand Down Expand Up @@ -408,7 +416,7 @@ private RecordAppendResult appendNewBatch(String topic,
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
MemoryRecordsBuilder recordsBuilder = recordsBuilder(topic, buffer, apiVersions.maxUsableProduceMagic());
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));
Expand All @@ -419,12 +427,17 @@ private RecordAppendResult appendNewBatch(String topic,
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
private MemoryRecordsBuilder recordsBuilder(String topic, ByteBuffer buffer, byte maxUsableMagic) {
if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
Compression newCompression = compression;
if (fakeCompression.containsKey(topic)) {
newCompression = fakeCompression.get(topic);
}

return MemoryRecords.builder(buffer, maxUsableMagic, newCompression, TimestampType.CREATE_TIME, 0L);
}

/**
Expand Down Expand Up @@ -534,7 +547,13 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(),

Compression nweCompression = compression;
if (fakeCompression.containsKey(bigBatch.topicPartition.topic())) {
nweCompression = fakeCompression.get(bigBatch.topicPartition.topic());
}

CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), nweCompression.type(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}

protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType): Unit = {
protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType, topic: String = topic): Unit = {
val partition = 0

val baseTimestamp = 123456L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSendDifferentCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = {
val producer = createProducer(
compressionType = "gzip",
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSendNonCompressedMessageWithLogAppendTime(quorum: String, groupProtocol: String): Unit = {
Expand Down

0 comments on commit 8ba25d2

Please sign in to comment.