Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23600

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashSet;
Expand Down Expand Up @@ -319,11 +320,53 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 3L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}

/**
* If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered
* with the correct delay. However, all messages are redelivered at the same time.
* @throws Exception
*/
@Test
public void testNegativeAcksWithBatch() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setup();
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
.create();
// send two messages in the same batch
producer.sendAsync("test-0");
producer.sendAsync("test-1");
producer.flush();

// negative ack the first message
consumer.negativeAcknowledge(consumer.receive());
// wait for 2s, negative ack the second message
Thread.sleep(2000);
consumer.negativeAcknowledge(consumer.receive());

// now 2s has passed, the first message should be redelivered 1s later.
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg1);
}

@Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,7 @@ private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<M
if (!sendToDLQ) {
return new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setBatchIndex(messageId.getBatchIndex())
.setLedgerId(messageId.getLedgerId())
.setEntryId(messageId.getEntryId());
}
Expand Down Expand Up @@ -2740,7 +2741,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
MessageId messageId = NegativeAcksTracker.discardPartitionIndex(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in queue.
return 0;
Expand All @@ -2751,7 +2752,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
MessageId id = NegativeAcksTracker.discardPartitionIndex(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentTripleLong2LongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private ConcurrentLongLongPairHashMap nackedMessages = null;
// map (ledgerId, entryId, batchIndex) -> timestamp
private ConcurrentTripleLong2LongHashMap nackedMessages = null;

private final ConsumerBase<?> consumer;
private final Timer timer;
Expand All @@ -51,7 +52,7 @@ class NegativeAcksTracker implements Closeable {

// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE;
private static final int DUMMY_PARTITION_INDEX = -2;

public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
Expand All @@ -77,20 +78,24 @@ private synchronized void triggerRedelivery(Timeout t) {
// Group all the nacked messages into one single re-delivery request
Set<MessageId> messagesToRedeliver = new HashSet<>();
long now = System.nanoTime();
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
nackedMessages.forEach((ledgerId, entryId, batchIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
MessageId msgId;
if (batchIndex == -1) {
msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX);
} else {
msgId = new BatchMessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX, (int) batchIndex);
}
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});

if (!messagesToRedeliver.isEmpty()) {
for (MessageId messageId : messagesToRedeliver) {
nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId());
MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
nackedMessages.remove(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(),
messageIdAdv.getBatchIndex());
}
consumer.onNegativeAcksSend(messagesToRedeliver);
log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size());
Expand All @@ -110,10 +115,7 @@ public synchronized void add(Message<?> message) {

private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(true)
.concurrencyLevel(1)
.build();
nackedMessages = new ConcurrentTripleLong2LongHashMap();
}

long backoffNs;
Expand All @@ -122,14 +124,9 @@ private synchronized void add(MessageId messageId, int redeliveryCount) {
} else {
backoffNs = nackDelayNanos;
}
MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
// ConcurrentLongLongPairHashMap requires the key and value >=0.
// partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use
// partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to
// avoid exception from ConcurrentLongLongPairHashMap.
MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(),
messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() :
NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs);
messageIdAdv.getBatchIndex(), System.nanoTime() + backoffNs);

if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
Expand All @@ -138,9 +135,28 @@ private synchronized void add(MessageId messageId, int redeliveryCount) {
}
}

/**
* Discard the partition index from the message id.
* @param messageId
* @return
*/
static public MessageId discardPartitionIndex(MessageId messageId) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
return new BatchMessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
DUMMY_PARTITION_INDEX, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(),
batchMessageId.getAckSet());
} else if (messageId instanceof MessageIdImpl) {
MessageIdImpl messageID = (MessageIdImpl) messageId;
return new MessageIdImpl(messageID.getLedgerId(), messageID.getEntryId(), DUMMY_PARTITION_INDEX);
} else {
return messageId;
}
}

@VisibleForTesting
Optional<Long> getNackedMessagesCount() {
return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
return Optional.ofNullable(nackedMessages).map(ConcurrentTripleLong2LongHashMap::size);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util.collections;

import java.util.HashMap;

public class ConcurrentTripleLong2LongHashMap {
public class TripleLong{
public long first;
public long second;
public long third;
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
@Override
public int hashCode() {
return Long.hashCode(first) ^ Long.hashCode(second) ^ Long.hashCode(third);
}
@Override
public boolean equals(Object obj) {
if(obj instanceof TripleLong){
TripleLong other = (TripleLong) obj;
return first == other.first && second == other.second && third == other.third;
}
return false;
}
}

private HashMap<TripleLong, Long> map;
public ConcurrentTripleLong2LongHashMap(){
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
// TODO: use hashmap for now
map = new HashMap<>();
}
public void put(long first, long second, long third, long value){
TripleLong key = new TripleLong();
key.first = first;
key.second = second;
key.third = third;
map.put(key, value);
}
public long get(long first, long second, long third){
TripleLong key = new TripleLong();
key.first = first;
key.second = second;
key.third = third;
return map.get(key);
}
public long remove(long first, long second, long third){
TripleLong key = new TripleLong();
key.first = first;
key.second = second;
key.third = third;
return map.remove(key);
}
public boolean containsKey(long first, long second, long third){
TripleLong key = new TripleLong();
key.first = first;
key.second = second;
key.third = third;
return map.containsKey(key);
}
public void clear(){
map.clear();
}
public long size(){
return map.size();
}
public boolean isEmpty() {
return map.isEmpty();
}

public interface TripleLongConsumer {
void call(long first, long second, long third, long value);
}
public void forEach(TripleLongConsumer consumer){
for(TripleLong key : map.keySet()){
consumer.call(key.first, key.second, key.third, map.get(key));
}
}

}
Loading