Skip to content

Commit

Permalink
[fix][broker] Cleanup correctly heartbeat bundle ownership when handl…
Browse files Browse the repository at this point in the history
…ing broker deletion event (apache#21083)
  • Loading branch information
Demogorgon314 authored Sep 4, 2023
1 parent 2921a41 commit b26ee8a
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,10 @@ private void checkDeadlockedThreads() {


private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
String lookupServiceAddress = pulsar().getLookupServiceAddress();
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
? NamespaceService.getHeartbeatNamespaceV2(lookupServiceAddress, pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(lookupServiceAddress, pulsar().getConfiguration());
final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
final String messageStr = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -1214,10 +1217,9 @@ private synchronized void doCleanup(String broker) {
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
String heartbeatNamespace =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
.toString();
String heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration()).toString();
NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), broker)).toString();
String heartbeatNamespaceV2 =
NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString();

Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public class NamespaceService implements AutoCloseable {
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)");
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s:%s";
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s";

private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;
Expand Down Expand Up @@ -164,7 +164,7 @@ public class NamespaceService implements AutoCloseable {
*/
public NamespaceService(PulsarService pulsar) {
this.pulsar = pulsar;
host = pulsar.getAdvertisedAddress();
this.host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
Expand Down Expand Up @@ -332,15 +332,17 @@ private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable Serv
* @throws PulsarServerException if an unexpected error occurs
*/
public void registerBootstrapNamespaces() throws PulsarServerException {

String lookupServiceAddress = pulsar.getLookupServiceAddress();
// ensure that we own the heartbeat namespace
if (registerNamespace(getHeartbeatNamespace(host, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespace(host, config));
if (registerNamespace(getHeartbeatNamespace(lookupServiceAddress, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}",
getHeartbeatNamespace(lookupServiceAddress, config));
}

// ensure that we own the heartbeat namespace
if (registerNamespace(getHeartbeatNamespaceV2(host, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespaceV2(host, config));
if (registerNamespace(getHeartbeatNamespaceV2(lookupServiceAddress, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}",
getHeartbeatNamespaceV2(lookupServiceAddress, config));
}

// we may not need strict ownership checking for bootstrap names for now
Expand Down Expand Up @@ -1579,24 +1581,12 @@ public void unloadSLANamespace() throws Exception {
LOG.info("Namespace {} unloaded successfully", namespaceName);
}

public static NamespaceName getHeartbeatNamespace(String host, ServiceConfiguration config) {
Integer port = null;
if (config.getWebServicePort().isPresent()) {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
}
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, port));
public static NamespaceName getHeartbeatNamespace(String lookupBroker, ServiceConfiguration config) {
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
}

public static NamespaceName getHeartbeatNamespaceV2(String host, ServiceConfiguration config) {
Integer port = null;
if (config.getWebServicePort().isPresent()) {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
}
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, host, port));
public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker, ServiceConfiguration config) {
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupBroker));
}

public static NamespaceName getSLAMonitorNamespace(String host, ServiceConfiguration config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ public void brokers() throws Exception {
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration())
+ "/0x00000000_0xffffffff")) {
assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared);
assertFalse(nsStatus.is_controlled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public void brokers() throws Exception {
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfiguration())
+ "/0x00000000_0xffffffff")) {
assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared);
assertFalse(nsStatus.is_controlled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,14 +1046,14 @@ public void testListTopic() throws Exception {
@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
NamespaceName heartbeatNamespacePulsar1V1 =
NamespaceService.getHeartbeatNamespace(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());
NamespaceService.getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar1V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());

NamespaceName heartbeatNamespacePulsar2V1 =
NamespaceService.getHeartbeatNamespace(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());
NamespaceService.getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());

NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -747,11 +750,41 @@ public void handleBrokerDeletionEventTest()
String broker = lookupServiceAddress1;
channel1.publishAssignEventAsync(bundle1, broker);
channel2.publishAssignEventAsync(bundle2, broker);

waitUntilNewOwner(channel1, bundle1, broker);
waitUntilNewOwner(channel2, bundle1, broker);
waitUntilNewOwner(channel1, bundle2, broker);
waitUntilNewOwner(channel2, bundle2, broker);

// Register the broker-1 heartbeat namespace bundle.
String heartbeatNamespaceBroker1V1 = NamespaceName
.get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), broker)).toString();
String heartbeatNamespaceBroker1V2 = NamespaceName
.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, broker)).toString();
String heartbeatNamespaceBroker1V1Bundle = heartbeatNamespaceBroker1V1 + "/0x00000000_0xfffffff0";
String heartbeatNamespaceBroker1V2Bundle = heartbeatNamespaceBroker1V2 + "/0x00000000_0xfffffff0";
channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V1Bundle, broker);
channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V2Bundle, broker);

// Register the broker-2 heartbeat namespace bundle.
String heartbeatNamespaceBroker2V1 = NamespaceName
.get(String.format(HEARTBEAT_NAMESPACE_FMT, conf.getClusterName(), lookupServiceAddress2)).toString();
String heartbeatNamespaceBroker2V2 = NamespaceName
.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupServiceAddress2)).toString();
String heartbeatNamespaceBroker2V1Bundle = heartbeatNamespaceBroker2V1 + "/0x00000000_0xfffffff0";
String heartbeatNamespaceBroker2V2Bundle = heartbeatNamespaceBroker2V2 + "/0x00000000_0xfffffff0";
channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, broker);
waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, broker);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, broker);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, broker);
waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, lookupServiceAddress2);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, lookupServiceAddress2);

// Verify to transfer the ownership to the other broker.
channel1.publishUnloadEventAsync(new Unload(broker, bundle1, Optional.of(lookupServiceAddress2)));
waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
Expand All @@ -765,12 +798,24 @@ public void handleBrokerDeletionEventTest()
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
leaderChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, NotificationType.Deleted);

waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);

waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, null);
waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, null);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, null);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, null);

waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, null);
waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, null);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, null);
waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, null);

verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any());

Expand All @@ -780,11 +825,11 @@ public void handleBrokerDeletionEventTest()
});

validateMonitorCounters(leaderChannel,
1,
2,
0,
1,
7,
0,
1,
2,
0,
0);

Expand All @@ -811,11 +856,11 @@ public void handleBrokerDeletionEventTest()
});

validateMonitorCounters(leaderChannel,
1,
2,
0,
1,
7,
0,
2,
3,
0,
0);

Expand All @@ -832,11 +877,11 @@ public void handleBrokerDeletionEventTest()
});

validateMonitorCounters(leaderChannel,
1,
2,
0,
1,
7,
0,
2,
3,
0,
1);

Expand All @@ -854,11 +899,11 @@ public void handleBrokerDeletionEventTest()
});

validateMonitorCounters(leaderChannel,
1,
2,
0,
1,
7,
0,
3,
4,
0,
1);

Expand All @@ -876,11 +921,11 @@ public void handleBrokerDeletionEventTest()
});

validateMonitorCounters(leaderChannel,
2,
0,
3,
0,
3,
9,
0,
4,
0,
1);

Expand All @@ -905,11 +950,11 @@ public void handleBrokerDeletionEventTest()
});

validateMonitorCounters(leaderChannel,
2,
0,
3,
0,
3,
9,
0,
4,
1,
1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ public void testSplitBundleWithHighestThroughput() throws Exception {

@Test
public void testHeartbeatNamespaceMatch() throws Exception {
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), conf);
NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,8 +1595,10 @@ public void testIsSystemTopic() {

assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_ASSIGN));
assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_LOG));
NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfig());
NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfig());
NamespaceName heartbeatNamespaceV1 = NamespaceService
.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), pulsar.getConfig());
NamespaceName heartbeatNamespaceV2 = NamespaceService
.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), pulsar.getConfig());
assertTrue(brokerService.isSystemTopic("persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck"));
assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() + "/healthcheck"));
}
Expand Down
Loading

0 comments on commit b26ee8a

Please sign in to comment.