Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23600

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashSet;
Expand Down Expand Up @@ -311,19 +312,61 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker();
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 3L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}

/**
* If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered
* with the correct delay. However, all messages are redelivered at the same time.
* @throws Exception
*/
@Test
public void testNegativeAcksWithBatch() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setup();
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
.create();
// send two messages in the same batch
producer.sendAsync("test-0");
producer.sendAsync("test-1");
producer.flush();

// negative ack the first message
consumer.negativeAcknowledge(consumer.receive());
// wait for 2s, negative ack the second message
Thread.sleep(2000);
consumer.negativeAcknowledge(consumer.receive());

// now 2s has passed, the first message should be redelivered 1s later.
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg1);
}

@Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
Comment on lines +210 to +218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be necessary to update pulsar-client-shaded and pulsar-client-all shading configuration to shade these libraries since they haven't been used on the client by now.

Avoiding client jar growth is something that we would need to address when adding new dependencies.

I realized that including the fastuti library will increase the client jar file size significantly. One detail about fastutil is that there's a smaller library alternative fastutil-core which includes a subset of the classes. fastutil is about 23MB and fastutil-core is about 6MB.

Picking only the classes that are needed from fastutil is possible with minimizeJar with maven's shade plugin, but since minimizeJar is a global setting, it would require building a minimized version of fastutil for the client usage to pick only the classes are needed since applying minimizeJar to the complete Pulsar client jar would be a very large change which could break things.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be necessary to update pulsar-client-shaded and pulsar-client-all shading configuration to shade these libraries since they haven't been used on the client by now.

I am not familiar with the shade part, hope this commit modify correctly.
28251b2

I realized that including the fastuti library will increase the client jar file size significantly. One detail about fastutil is that there's a smaller library alternative fastutil-core which includes a subset of the classes. fastutil is about 23MB and fastutil-core is about 6MB.
Picking only the classes that are needed from fastutil is possible with minimizeJar with maven's shade plugin, but since minimizeJar is a global setting, it would require building a minimized version of fastutil for the client usage to pick only the classes are needed since applying minimizeJar to the complete Pulsar client jar would be a very large change which could break things.

I have update the dependency to fastutil-core.
As for minimizeJar, it looks like we can minimize the size of jar to the minimum, but i think that we may improve pulsar with fastutil many places else, so some other data structures can be helpful too.

Copy link
Member

@lhotari lhotari Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment cannot be marked resolved before this is really addressed.

Due to the large size of both fastutil (23MB) and fastutil-core (6MB), It is necessary to add a new module that minimizes fastutil for the use of the shaded Pulsar client (in published pulsar-client and pulsar-client-all modules). The minimized fastutil module shouldn't be published to maven central at all, but it is necessary to make it a separate module since it's not possible to selectively minimize modules with the maven-shade-plugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a later step that we minimize the fastutil, right?
Currently, we need to fix the shade problem, is it not enough with following code?

<include>it.unimi.dsi:*</include>

I will appreciate it if you can help to shade the fastutil dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you should use the fastutil-core instead of fastutil, and then we can make a new PR to reduce the fastutil-core size.

And then use <include>it.unimi.dsi:fastutil-core</include>.

Copy link
Member

@lhotari lhotari Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you should use the fastutil-core instead of fastutil, and then we can make a new PR to reduce the fastutil-core size.

And then use <include>it.unimi.dsi:fastutil-core</include>.

@nodece That's that's a solution that isn't optimal:

  • In the broker, we also need the full fastutil library and it would result in duplicate libraries fastutil-core and fastutil in the same flat classpath.
  • fastutil-core library is also very large in size, ≅6MB. We only need a few classes from the library.

The optimal solution would be to include only the classes from fastutil into the shaded pulsar-client and pulsar-client-all which are really used and needed.

This could be achieved in many ways. One possible solution is to introduce an intermediate module for shaded pulsar-client and pulsar-client-all that isn't published to maven central at all. It would be used to minimize and include only the classes from fastutil which are required by pulsar-client shading.


</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
MessageId messageId = NegativeAcksTracker.discardPartitionIndex(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in queue.
return 0;
Expand All @@ -2751,7 +2751,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
MessageId id = NegativeAcksTracker.discardPartitionIndex(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,50 @@
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private ConcurrentLongLongPairHashMap nackedMessages = null;
// timestamp -> ledgerId -> entryId, no need to batch index, if different messages have
// different timestamp, there will be multiple entries in the map
// AVL Tree -> LongOpenHashMap -> Roaring64Bitmap
// there are many timestamp, a few ledgerId, many entryId
private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = null;

private final ConsumerBase<?> consumer;
private final Timer timer;
private final long nackDelayNanos;
private final long timerIntervalNanos;
private final long nackDelayMs;
private final RedeliveryBackoff negativeAckRedeliveryBackoff;
private final int negativeAckPrecisionBitCnt;

private Timeout timeout;

// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE;
private static final long MIN_NACK_DELAY_MS = 100;
private static final int DUMMY_PARTITION_INDEX = -2;

public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
this.timer = consumer.getClient().timer();
this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_NANOS);
this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_MS);
this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff();
if (negativeAckRedeliveryBackoff != null) {
this.timerIntervalNanos = Math.max(
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
MIN_NACK_DELAY_NANOS) / 3;
} else {
this.timerIntervalNanos = nackDelayNanos / 3;
}
this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt();
}

private synchronized void triggerRedelivery(Timeout t) {
Expand All @@ -76,28 +76,50 @@ private synchronized void triggerRedelivery(Timeout t) {

// Group all the nacked messages into one single re-delivery request
Set<MessageId> messagesToRedeliver = new HashSet<>();
long now = System.nanoTime();
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
long currentTimestamp = System.currentTimeMillis();
for (long timestamp : nackedMessages.keySet()) {
if (timestamp > currentTimestamp) {
// We are done with all the messages that need to be redelivered
break;
}
});

Long2ObjectMap<Roaring64Bitmap> ledgerMap = nackedMessages.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entrySet = ledgerEntry.getValue();
entrySet.forEach(entryId -> {
MessageId msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX);
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
});
}
}

if (!messagesToRedeliver.isEmpty()) {
for (MessageId messageId : messagesToRedeliver) {
nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId());
// remove entries from the nackedMessages map
for (long timestamp : nackedMessages.keySet()) {
if (timestamp <= currentTimestamp) {
nackedMessages.remove(timestamp);
} else {
break;
}
}
consumer.onNegativeAcksSend(messagesToRedeliver);
log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size());
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
}

this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
if (!nackedMessages.isEmpty()) {
long nextTriggerTimestamp = nackedMessages.firstLongKey();
long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0);
if (delayMs > 0) {
this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS);
} else {
this.timeout = timer.newTimeout(this::triggerRedelivery, 0, TimeUnit.MILLISECONDS);
}
} else {
this.timeout = null;
}
}

public synchronized void add(MessageId messageId) {
Expand All @@ -108,39 +130,76 @@ public synchronized void add(Message<?> message) {
add(message.getMessageId(), message.getRedeliveryCount());
}

static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}

private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(true)
.concurrencyLevel(1)
.build();
nackedMessages = new Long2ObjectAVLTreeMap<>();
}

long backoffNs;
long backoffMs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use nanoseconds? Then you need to use System.nanoTime() instead of System.currentTimeMillis(), the System.nanoTime() is quickly based on JVM.

Copy link
Member Author

@thetumbled thetumbled Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Nano is not convenient for calculating, as we need to estimate the accuracy of timestamp.
    For example, with negativeAckPrecisionBitCnt = 10, we know that the redelivery time may be earlier at most 2^10=1024ms~=1s. We just trim the lower 8 bit to bucket the time.
    But with nano(1ms=1000000ns). we can't get a suitable conf value easily.
  • It is unnecessary to use timestamp in ns unit. As the tick time of the timer is 1ms, and the latency of message delivery is at ms level.

if (negativeAckRedeliveryBackoff != null) {
backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
backoffMs = TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount));
} else {
backoffNs = nackDelayNanos;
backoffMs = nackDelayMs;
}
MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs, negativeAckPrecisionBitCnt);
if (nackedMessages.containsKey(timestamp)) {
Long2ObjectMap<Roaring64Bitmap> ledgerMap = nackedMessages.get(timestamp);
if (ledgerMap.containsKey(messageIdAdv.getLedgerId())) {
Roaring64Bitmap entrySet = ledgerMap.get(messageIdAdv.getLedgerId());
entrySet.add(messageIdAdv.getEntryId());
} else {
Roaring64Bitmap entrySet = new Roaring64Bitmap();
entrySet.add(messageIdAdv.getEntryId());
ledgerMap.put(messageIdAdv.getLedgerId(), entrySet);
}
} else {
Roaring64Bitmap entrySet = new Roaring64Bitmap();
entrySet.add(messageIdAdv.getEntryId());
Long2ObjectMap<Roaring64Bitmap> ledgerMap = new Long2ObjectAVLTreeMap<>();
ledgerMap.put(messageIdAdv.getLedgerId(), entrySet);
nackedMessages.put(timestamp, ledgerMap);
}
MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
// ConcurrentLongLongPairHashMap requires the key and value >=0.
// partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use
// partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to
// avoid exception from ConcurrentLongLongPairHashMap.
nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(),
messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() :
NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs);

if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
// nack immediately following the current one will be batched into the same redeliver request.
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
this.timeout = timer.newTimeout(this::triggerRedelivery, backoffMs, TimeUnit.MILLISECONDS);
}
}

/**
* Discard the partition index from the message id.
*
* @param messageId
* @return
*/
public static MessageId discardPartitionIndex(MessageId messageId) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
return new BatchMessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
DUMMY_PARTITION_INDEX, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(),
batchMessageId.getAckSet());
} else if (messageId instanceof MessageIdImpl) {
MessageIdImpl messageID = (MessageIdImpl) messageId;
return new MessageIdImpl(messageID.getLedgerId(), messageID.getEntryId(), DUMMY_PARTITION_INDEX);
} else {
return messageId;
}
}

@VisibleForTesting
Optional<Long> getNackedMessagesCount() {
return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
synchronized long getNackedMessagesCount() {
if (nackedMessages == null) {
return 0;
}
return nackedMessages.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
)
private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1);

@ApiModelProperty(
name = "negativeAckPrecisionBitCnt",
value = "The redelivery time precision bit count. The lower bits of the redelivery time will be"
+ "trimmed to reduce the memory occupation.\nThe default value is 8, which means the"
+ "redelivery time will be bucketed by 256ms, the redelivery time could be earlier(no later)"
+ "than the expected time, but no more than 256ms. \nIf set to k, the redelivery time will be"
+ "bucketed by 2^k ms.\nIf the value is 0, the redelivery time will be accurate to ms."
)
private int negativeAckPrecisionBitCnt = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you consider 0 as the default value? Users may wish to submit later, which is consistent with previous behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to configure it as the best the confguration value. Without this enhancement, it is unrealistic to use negative acknowledgement in production. The memory occupation will inflate very fast.
But i will listen to the voice from the community, if more and more people think that disabling this feature by default is better, i will update it.


@ApiModelProperty(
name = "maxTotalReceiverQueueSizeAcrossPartitions",
value = "The max total receiver queue size across partitions.\n"
Expand Down
Loading