From 2c41162ef1274f0c2d9a09ab4b44709b73045074 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=A2=E6=98=A5=E4=BA=AE?= <946240095@qq.com> Date: Sun, 26 Jan 2025 21:53:20 +0800 Subject: [PATCH 1/4] [INLONG-11711][SDK] SortSDK shares the same PulsarClient among different SortTasks to avoid performance bottlenecks caused by too many PulsarClients --- .../sort/manager/InlongMultiTopicManager.java | 1 + .../manager/InlongSingleTopicManager.java | 3 +- .../sdk/sort/manager/InlongTopicManager.java | 58 ++++++++++--------- 3 files changed, 35 insertions(+), 27 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java index 2d098345cac..8100ebd5e3e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java @@ -220,6 +220,7 @@ private void updatePulsarFetcher(String clusterId, List topics) { auth = AuthenticationFactory.token(token); } PulsarClient pulsarClient = PulsarClient.builder() + .useNoopDnsResolver(true) .serviceUrl(topic.getInLongCluster().getBootstraps()) .authentication(auth) .build(); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index d137a903d2a..195261eaa44 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -66,7 +66,7 @@ public class InlongSingleTopicManager extends TopicManager { private static final Logger LOGGER = LoggerFactory.getLogger(InlongSingleTopicManager.class); private final ConcurrentHashMap fetchers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap pulsarClients = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap pulsarClients = new ConcurrentHashMap<>(); private final ConcurrentHashMap tubeFactories = new ConcurrentHashMap<>(); private final PeriodicTask updateMetaDataWorker; @@ -370,6 +370,7 @@ private boolean checkAndCreateNewPulsarClient(InLongTopic topic) { auth = AuthenticationFactory.token(token); } PulsarClient pulsarClient = PulsarClient.builder() + .useNoopDnsResolver(true) .serviceUrl(topic.getInLongCluster().getBootstraps()) .authentication(auth) .build(); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java index 6d3343293b2..32ac9453adc 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java @@ -65,7 +65,7 @@ public class InlongTopicManager extends TopicManager { private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private final Map fetchers = new ConcurrentHashMap<>(); - private final Map pulsarClients = new ConcurrentHashMap<>(); + private static final Map pulsarClients = new ConcurrentHashMap<>(); private final Map tubeFactories = new ConcurrentHashMap<>(); protected final ForkJoinPool pool; @@ -359,33 +359,39 @@ private void onlinePulsarClients() { private void createPulsarClient(CacheZoneCluster cluster) { LOGGER.info("start to init pulsar client for cluster={}", cluster); - if (cluster.getBootstraps() != null) { - try { - String token = cluster.getToken(); - Authentication auth = null; - if (StringUtils.isNoneBlank(token)) { - auth = AuthenticationFactory.token(token); - } - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(cluster.getBootstraps()) - .authentication(auth) - .build(); - LOGGER.info("create pulsar client succ cluster:{}, token:{}", - cluster.getClusterId(), - cluster.getToken()); - PulsarClient oldClient = pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient); - if (oldClient != null && !oldClient.isClosed()) { - LOGGER.warn("close new pulsar client for cluster={}", cluster); - pulsarClient.close(); - } - } catch (Exception e) { - LOGGER.error("create pulsar client error for cluster={}", cluster, e); - return; - } - LOGGER.info("success to init pulsar client for cluster={}", cluster); - } else { + String clientKey = cluster.getBootstraps(); + if (clientKey == null) { LOGGER.error("bootstrap is null for cluster={}", cluster); + return; + } + if (pulsarClients.containsKey(clientKey)) { + LOGGER.info("Repeat to init pulsar client for cluster={}", cluster); + return; + } + try { + String token = cluster.getToken(); + Authentication auth = null; + if (StringUtils.isNoneBlank(token)) { + auth = AuthenticationFactory.token(token); + } + PulsarClient pulsarClient = PulsarClient.builder() + .useNoopDnsResolver(true) + .serviceUrl(cluster.getBootstraps()) + .authentication(auth) + .build(); + LOGGER.info("create pulsar client succ cluster:{}, token:{}", + cluster.getClusterId(), + cluster.getToken()); + PulsarClient oldClient = pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient); + if (oldClient != null && !oldClient.isClosed()) { + LOGGER.warn("close new pulsar client for cluster={}", cluster); + pulsarClient.close(); + } + } catch (Exception e) { + LOGGER.error("create pulsar client error for cluster={}", cluster, e); + return; } + LOGGER.info("success to init pulsar client for cluster={}", cluster); } private List getCacheZoneClusters(InlongTopicTypeEnum type) { From b155956d3dc1f0e691ae94feb84dba652d5ab7cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=A2=E6=98=A5=E4=BA=AE?= <946240095@qq.com> Date: Sun, 26 Jan 2025 22:12:30 +0800 Subject: [PATCH 2/4] [INLONG-11711][SDK] SortSDK shares the same PulsarClient among different SortTasks to avoid performance bottlenecks caused by too many PulsarClients --- .../apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java | 1 - .../apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java | 1 - .../org/apache/inlong/sdk/sort/manager/InlongTopicManager.java | 1 - 3 files changed, 3 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java index 8100ebd5e3e..2d098345cac 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java @@ -220,7 +220,6 @@ private void updatePulsarFetcher(String clusterId, List topics) { auth = AuthenticationFactory.token(token); } PulsarClient pulsarClient = PulsarClient.builder() - .useNoopDnsResolver(true) .serviceUrl(topic.getInLongCluster().getBootstraps()) .authentication(auth) .build(); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index 195261eaa44..8ced073b472 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -370,7 +370,6 @@ private boolean checkAndCreateNewPulsarClient(InLongTopic topic) { auth = AuthenticationFactory.token(token); } PulsarClient pulsarClient = PulsarClient.builder() - .useNoopDnsResolver(true) .serviceUrl(topic.getInLongCluster().getBootstraps()) .authentication(auth) .build(); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java index 32ac9453adc..378d8deb3cf 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java @@ -375,7 +375,6 @@ private void createPulsarClient(CacheZoneCluster cluster) { auth = AuthenticationFactory.token(token); } PulsarClient pulsarClient = PulsarClient.builder() - .useNoopDnsResolver(true) .serviceUrl(cluster.getBootstraps()) .authentication(auth) .build(); From 035482c4084b5bdc497deb6c864055ffeac3ba94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=A2=E6=98=A5=E4=BA=AE?= <946240095@qq.com> Date: Mon, 27 Jan 2025 10:21:53 +0800 Subject: [PATCH 3/4] do not close pulsar client when a sort task exit --- .../inlong/sdk/sort/manager/InlongSingleTopicManager.java | 2 +- .../org/apache/inlong/sdk/sort/manager/InlongTopicManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index 8ced073b472..b95329f2a65 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -199,7 +199,7 @@ public boolean clean() { } closeFetcher(); - closePulsarClient(); +// closePulsarClient(); closeTubeSessionFactory(); LOGGER.info("close finished {}", sortTaskId); return true; diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java index 378d8deb3cf..0f46fceb805 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java @@ -88,7 +88,7 @@ public boolean clean() { LOGGER.info("start to clean topic manager, sortTaskId={}", sortTaskId); stopAssign = true; closeAllFetchers(); - closeAllPulsarClients(); +// closeAllPulsarClients(); closeAllTubeFactories(); LOGGER.info("success to clean topic manager, sortTaskId={}", sortTaskId); return true; From 2ea22d6242deaa2a13db3199ed1b3ca19bb2a1cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=A2=E6=98=A5=E4=BA=AE?= <946240095@qq.com> Date: Mon, 27 Jan 2025 11:04:52 +0800 Subject: [PATCH 4/4] spotless --- .../inlong/sdk/sort/manager/InlongSingleTopicManager.java | 2 +- .../org/apache/inlong/sdk/sort/manager/InlongTopicManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index b95329f2a65..68d83c0286d 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -199,7 +199,7 @@ public boolean clean() { } closeFetcher(); -// closePulsarClient(); + // closePulsarClient(); closeTubeSessionFactory(); LOGGER.info("close finished {}", sortTaskId); return true; diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java index 0f46fceb805..0d7086de819 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java @@ -88,7 +88,7 @@ public boolean clean() { LOGGER.info("start to clean topic manager, sortTaskId={}", sortTaskId); stopAssign = true; closeAllFetchers(); -// closeAllPulsarClients(); + // closeAllPulsarClients(); closeAllTubeFactories(); LOGGER.info("success to clean topic manager, sortTaskId={}", sortTaskId); return true;