Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23600

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ flexible messaging model and an intuitive client API.</description>
<extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
<oshi.version>6.4.0</oshi.version>
<checkerframework.version>3.33.0</checkerframework.version>
<it-unimi-dsi-fastutil.version>8.5.15</it-unimi-dsi-fastutil.version>
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashSet;
Expand Down Expand Up @@ -311,19 +312,61 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker();
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}

/**
* If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered
* with the correct delay. However, all messages are redelivered at the same time.
* @throws Exception
*/
@Test
public void testNegativeAcksWithBatch() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setup();
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
.create();
// send two messages in the same batch
producer.sendAsync("test-0");
producer.sendAsync("test-1");
producer.flush();

// negative ack the first message
consumer.negativeAcknowledge(consumer.receive());
// wait for 2s, negative ack the second message
Thread.sleep(2000);
consumer.negativeAcknowledge(consumer.receive());

// now 2s has passed, the first message should be redelivered 1s later.
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg1);
}

@Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
<include>org.tukaani:xz</include>
<!-- Issue #6834, Since Netty ByteBuf shaded, we need also shade this module -->
<include>org.apache.pulsar:pulsar-client-messagecrypto-bc</include>
<include>it.unimi.dsi:*</include>
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
<include>org.tukaani:xz</include>
<!-- Issue #6834, Since Netty ByteBuf shaded, we need also shade this module -->
<include>org.apache.pulsar:pulsar-client-messagecrypto-bc</include>
<include>it.unimi.dsi:*</include>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the correct solution for shading fastutil. Please see https://github.com/apache/pulsar/pull/23600/files#r1848136065

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have update the shade conf, please review again, thanks.

</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
Expand Down
11 changes: 11 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
<version>${it-unimi-dsi-fastutil.version}</version>
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
MessageId messageId = NegativeAcksTracker.discardPartitionIndex(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in queue.
return 0;
Expand All @@ -2751,7 +2751,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
MessageId id = NegativeAcksTracker.discardPartitionIndex(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,52 @@
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private ConcurrentLongLongPairHashMap nackedMessages = null;
// timestamp -> ledgerId -> entryId, no need to batch index, if different messages have
// different timestamp, there will be multiple entries in the map
// AVL Tree -> LongOpenHashMap -> Roaring64Bitmap
// there are many timestamp, a few ledgerId, many entryId
private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = null;

private final ConsumerBase<?> consumer;
private final Timer timer;
private final long nackDelayNanos;
private final long timerIntervalNanos;
private final long nackDelayMs;
private final RedeliveryBackoff negativeAckRedeliveryBackoff;
private final int negativeAckPrecisionBitCnt;

private Timeout timeout;

// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE;
private static final long MIN_NACK_DELAY_MS = 100;
private static final int DUMMY_PARTITION_INDEX = -2;

public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
this.timer = consumer.getClient().timer();
this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_NANOS);
this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_MS);
this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff();
if (negativeAckRedeliveryBackoff != null) {
this.timerIntervalNanos = Math.max(
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
MIN_NACK_DELAY_NANOS) / 3;
} else {
this.timerIntervalNanos = nackDelayNanos / 3;
}
this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt();
}

private synchronized void triggerRedelivery(Timeout t) {
Expand All @@ -76,28 +78,52 @@ private synchronized void triggerRedelivery(Timeout t) {

// Group all the nacked messages into one single re-delivery request
Set<MessageId> messagesToRedeliver = new HashSet<>();
long now = System.nanoTime();
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
long currentTimestamp = System.currentTimeMillis();
for (long timestamp : nackedMessages.keySet()) {
if (timestamp > currentTimestamp) {
// We are done with all the messages that need to be redelivered
break;
}
});

Long2ObjectMap<Roaring64Bitmap> ledgerMap = nackedMessages.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entrySet = ledgerEntry.getValue();
entrySet.forEach(entryId -> {
MessageId msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX);
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
});
}
}

if (!messagesToRedeliver.isEmpty()) {
for (MessageId messageId : messagesToRedeliver) {
nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId());
// remove entries from the nackedMessages map
LongBidirectionalIterator iterator = nackedMessages.keySet().iterator();
while (iterator.hasNext()) {
long timestamp = iterator.nextLong();
if (timestamp <= currentTimestamp) {
iterator.remove();
} else {
break;
}
}
consumer.onNegativeAcksSend(messagesToRedeliver);
log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size());
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
}

this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
if (!nackedMessages.isEmpty()) {
long nextTriggerTimestamp = nackedMessages.firstLongKey();
long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0);
if (delayMs > 0) {
this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS);
} else {
this.timeout = timer.newTimeout(this::triggerRedelivery, 0, TimeUnit.MILLISECONDS);
}
} else {
this.timeout = null;
}
}

public synchronized void add(MessageId messageId) {
Expand All @@ -108,39 +134,76 @@ public synchronized void add(Message<?> message) {
add(message.getMessageId(), message.getRedeliveryCount());
}

static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}

private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(true)
.concurrencyLevel(1)
.build();
nackedMessages = new Long2ObjectAVLTreeMap<>();
}

long backoffNs;
long backoffMs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use nanoseconds? Then you need to use System.nanoTime() instead of System.currentTimeMillis(), the System.nanoTime() is quickly based on JVM.

Copy link
Member Author

@thetumbled thetumbled Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Nano is not convenient for calculating, as we need to estimate the accuracy of timestamp.
    For example, with negativeAckPrecisionBitCnt = 10, we know that the redelivery time may be earlier at most 2^10=1024ms~=1s. We just trim the lower 8 bit to bucket the time.
    But with nano(1ms=1000000ns). we can't get a suitable conf value easily.
  • It is unnecessary to use timestamp in ns unit. As the tick time of the timer is 1ms, and the latency of message delivery is at ms level.

if (negativeAckRedeliveryBackoff != null) {
backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
backoffMs = TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount));
} else {
backoffMs = nackDelayMs;
}
MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs, negativeAckPrecisionBitCnt);
if (nackedMessages.containsKey(timestamp)) {
Long2ObjectMap<Roaring64Bitmap> ledgerMap = nackedMessages.get(timestamp);
if (ledgerMap.containsKey(messageIdAdv.getLedgerId())) {
Roaring64Bitmap entrySet = ledgerMap.get(messageIdAdv.getLedgerId());
entrySet.add(messageIdAdv.getEntryId());
} else {
Roaring64Bitmap entrySet = new Roaring64Bitmap();
entrySet.add(messageIdAdv.getEntryId());
ledgerMap.put(messageIdAdv.getLedgerId(), entrySet);
}
} else {
backoffNs = nackDelayNanos;
Roaring64Bitmap entrySet = new Roaring64Bitmap();
entrySet.add(messageIdAdv.getEntryId());
Long2ObjectMap<Roaring64Bitmap> ledgerMap = new Long2ObjectAVLTreeMap<>();
ledgerMap.put(messageIdAdv.getLedgerId(), entrySet);
nackedMessages.put(timestamp, ledgerMap);
}
MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
// ConcurrentLongLongPairHashMap requires the key and value >=0.
// partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use
// partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to
// avoid exception from ConcurrentLongLongPairHashMap.
nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(),
messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() :
NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs);

if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
// nack immediately following the current one will be batched into the same redeliver request.
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
this.timeout = timer.newTimeout(this::triggerRedelivery, backoffMs, TimeUnit.MILLISECONDS);
}
}

/**
* Discard the partition index from the message id.
*
* @param messageId
* @return
*/
public static MessageId discardPartitionIndex(MessageId messageId) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
return new BatchMessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
DUMMY_PARTITION_INDEX, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(),
batchMessageId.getAckSet());
} else if (messageId instanceof MessageIdImpl) {
MessageIdImpl messageID = (MessageIdImpl) messageId;
return new MessageIdImpl(messageID.getLedgerId(), messageID.getEntryId(), DUMMY_PARTITION_INDEX);
} else {
return messageId;
}
}

@VisibleForTesting
Optional<Long> getNackedMessagesCount() {
return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
synchronized long getNackedMessagesCount() {
if (nackedMessages == null) {
return 0;
}
return nackedMessages.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
)
private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1);

@ApiModelProperty(
name = "negativeAckPrecisionBitCnt",
value = "The redelivery time precision bit count. The lower bits of the redelivery time will be"
+ "trimmed to reduce the memory occupation.\nThe default value is 8, which means the"
+ "redelivery time will be bucketed by 256ms, the redelivery time could be earlier(no later)"
+ "than the expected time, but no more than 256ms. \nIf set to k, the redelivery time will be"
+ "bucketed by 2^k ms.\nIf the value is 0, the redelivery time will be accurate to ms."
)
private int negativeAckPrecisionBitCnt = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you consider 0 as the default value? Users may wish to submit later, which is consistent with previous behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to configure it as the best the confguration value. Without this enhancement, it is unrealistic to use negative acknowledgement in production. The memory occupation will inflate very fast.
But i will listen to the voice from the community, if more and more people think that disabling this feature by default is better, i will update it.


@ApiModelProperty(
name = "maxTotalReceiverQueueSizeAcrossPartitions",
value = "The max total receiver queue size across partitions.\n"
Expand Down
Loading