diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java index 8241e0802c..b1f9f0a073 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java @@ -28,6 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -135,6 +137,44 @@ public void waitFollowerShip() throws InterruptedException { LOG.info("Became the follower of AMS (Database lease)"); } + @Override + public void registAndElect() throws Exception { + boolean isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE); + if (!isMasterSlaveMode) { + LOG.debug("Master-slave mode is not enabled, skip node registration"); + return; + } + // In master-slave mode, register node to database by writing OPTIMIZING_SERVICE info + // This is similar to ZK mode registering ephemeral nodes + long now = System.currentTimeMillis(); + String optimizingInfoJson = JacksonUtil.toJSONString(optimizingServiceServerInfo); + try { + doAsIgnoreError( + HaLeaseMapper.class, + mapper -> { + int updated = + mapper.updateServerInfo( + clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now); + if (updated == 0) { + mapper.insertServerInfoIfAbsent( + clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now); + } + }); + LOG.info( + "Registered AMS node to database: nodeId={}, optimizingService={}", + nodeId, + optimizingServiceServerInfo); + } catch (Exception e) { + LOG.error("Failed to register node to database", e); + throw e; + } + } + + @Override + public boolean hasLeadership() { + return isLeader.get(); + } + /** Closes the heartbeat executor safely. */ @Override public void close() { @@ -147,9 +187,6 @@ public void close() { } } - @Override - public void registAndElect() throws Exception {} - private class HeartbeatRunnable implements Runnable { @Override public void run() { @@ -304,6 +341,40 @@ private void onLeaderLost() { } } + @Override + public List getAliveNodes() { + List aliveNodes = new ArrayList<>(); + if (!isLeader.get()) { + LOG.warn("Only leader node can get alive nodes list"); + return aliveNodes; + } + try { + long currentTime = System.currentTimeMillis(); + List leases = + getAs( + HaLeaseMapper.class, + mapper -> mapper.selectLeasesByService(clusterName, OPTIMIZING_SERVICE)); + for (HaLeaseMeta lease : leases) { + // Only include nodes with valid (non-expired) leases + if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > currentTime) { + if (lease.getServerInfoJson() != null && !lease.getServerInfoJson().isEmpty()) { + try { + AmsServerInfo nodeInfo = + JacksonUtil.parseObject(lease.getServerInfoJson(), AmsServerInfo.class); + aliveNodes.add(nodeInfo); + } catch (Exception e) { + LOG.warn("Failed to parse server info for node {}", lease.getNodeId(), e); + } + } + } + } + } catch (Exception e) { + LOG.error("Failed to get alive nodes from database", e); + throw e; + } + return aliveNodes; + } + private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restBindPort) { AmsServerInfo amsServerInfo = new AmsServerInfo(); amsServerInfo.setHost(host); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java index 041c1e4693..118b87d2a6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java @@ -18,6 +18,10 @@ package org.apache.amoro.server.ha; +import org.apache.amoro.client.AmsServerInfo; + +import java.util.List; + /** * Common interface for high availability (HA) containers. * @@ -50,4 +54,18 @@ public interface HighAvailabilityContainer { * @throws Exception If registration fails or participation in the primary election fails. */ void registAndElect() throws Exception; + + /** + * Used in master-slave mode to obtain information about all currently registered AMS nodes. + * + * @return List + */ + List getAliveNodes(); + + /** + * Used to determine whether the current AMS node is the primary node. + * + * @return + */ + boolean hasLeadership(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java index ef55b9ac7d..8772121f0a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java @@ -18,9 +18,11 @@ package org.apache.amoro.server.ha; +import org.apache.amoro.client.AmsServerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.concurrent.CountDownLatch; /** No-op HA container that never blocks and performs no leader election. */ @@ -49,4 +51,14 @@ public void close() { @Override public void registAndElect() throws Exception {} + + @Override + public List getAliveNodes() { + return List.of(); + } + + @Override + public boolean hasLeadership() { + return false; + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java index de25d4901d..c0315fc976 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java @@ -46,6 +46,8 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -55,13 +57,27 @@ public class ZkHighAvailabilityContainer implements HighAvailabilityContainer, L private final LeaderLatch leaderLatch; private final CuratorFramework zkClient; + + // Package-private accessors for testing + CuratorFramework getZkClient() { + return zkClient; + } + + LeaderLatch getLeaderLatch() { + return leaderLatch; + } + private final String tableServiceMasterPath; private final String optimizingServiceMasterPath; + private final String nodesPath; private final AmsServerInfo tableServiceServerInfo; private final AmsServerInfo optimizingServiceServerInfo; + private final boolean isMasterSlaveMode; private volatile CountDownLatch followerLatch; + private String registeredNodePath; public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exception { + this.isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE); if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) { String zkServerAddress = serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS); int zkSessionTimeout = @@ -71,6 +87,7 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME); tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(haClusterName); optimizingServiceMasterPath = AmsHAProperties.getOptimizingServiceMasterPath(haClusterName); + nodesPath = AmsHAProperties.getNodesPath(haClusterName); ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); setupZookeeperAuth(serviceConfig); this.zkClient = @@ -83,6 +100,7 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio zkClient.start(); createPathIfNeeded(tableServiceMasterPath); createPathIfNeeded(optimizingServiceMasterPath); + createPathIfNeeded(nodesPath); String leaderPath = AmsHAProperties.getLeaderPath(haClusterName); createPathIfNeeded(leaderPath); leaderLatch = new LeaderLatch(zkClient, leaderPath); @@ -103,8 +121,10 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio zkClient = null; tableServiceMasterPath = null; optimizingServiceMasterPath = null; + nodesPath = null; tableServiceServerInfo = null; optimizingServiceServerInfo = null; + registeredNodePath = null; // block follower latch forever when ha is disabled followerLatch = new CountDownLatch(1); } @@ -142,7 +162,24 @@ public void waitLeaderShip() throws Exception { @Override public void registAndElect() throws Exception { - // TODO Here you can register for AMS and participate in the election. + if (!isMasterSlaveMode) { + LOG.debug("Master-slave mode is not enabled, skip node registration"); + return; + } + if (zkClient == null || nodesPath == null) { + LOG.warn("HA is not enabled, skip node registration"); + return; + } + // Register node to ZK using ephemeral node + // The node will be automatically deleted when the session expires + String nodeInfo = JacksonUtil.toJSONString(tableServiceServerInfo); + registeredNodePath = + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) + .forPath(nodesPath + "/node-", nodeInfo.getBytes(StandardCharsets.UTF_8)); + LOG.info("Registered AMS node to ZK: {}", registeredNodePath); } @Override @@ -158,6 +195,18 @@ public void waitFollowerShip() throws Exception { public void close() { if (leaderLatch != null) { try { + // Unregister node from ZK + if (registeredNodePath != null) { + try { + zkClient.delete().forPath(registeredNodePath); + LOG.info("Unregistered AMS node from ZK: {}", registeredNodePath); + } catch (KeeperException.NoNodeException e) { + // Node already deleted, ignore + LOG.debug("Node {} already deleted", registeredNodePath); + } catch (Exception e) { + LOG.warn("Failed to unregister node from ZK: {}", registeredNodePath, e); + } + } this.leaderLatch.close(); this.zkClient.close(); } catch (IOException e) { @@ -192,6 +241,60 @@ private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restB return amsServerInfo; } + /** + * Get list of alive nodes. Only the leader node can call this method. + * + * @return List of alive node information + */ + public List getAliveNodes() { + List aliveNodes = new ArrayList<>(); + if (!isMasterSlaveMode) { + LOG.debug("Master-slave mode is not enabled, return empty node list"); + return aliveNodes; + } + if (zkClient == null || nodesPath == null) { + LOG.warn("HA is not enabled, return empty node list"); + return aliveNodes; + } + if (!leaderLatch.hasLeadership()) { + LOG.warn("Only leader node can get alive nodes list"); + return aliveNodes; + } + try { + List nodePaths = zkClient.getChildren().forPath(nodesPath); + for (String nodePath : nodePaths) { + try { + String fullPath = nodesPath + "/" + nodePath; + byte[] data = zkClient.getData().forPath(fullPath); + if (data != null && data.length > 0) { + String nodeInfoJson = new String(data, StandardCharsets.UTF_8); + AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, AmsServerInfo.class); + aliveNodes.add(nodeInfo); + } + } catch (Exception e) { + LOG.warn("Failed to get node info for path: {}", nodePath, e); + } + } + } catch (KeeperException.NoNodeException e) { + LOG.debug("Nodes path {} does not exist", nodesPath); + } catch (Exception e) { + throw new RuntimeException(e); + } + return aliveNodes; + } + + /** + * Check if current node is the leader. + * + * @return true if current node is the leader, false otherwise + */ + public boolean hasLeadership() { + if (leaderLatch == null) { + return false; + } + return leaderLatch.hasLeadership(); + } + private void createPathIfNeeded(String path) throws Exception { try { zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java index c3ce95d748..b7f7723732 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java @@ -107,6 +107,20 @@ int renewLease( HaLeaseMeta selectLease( @Param("clusterName") String clusterName, @Param("serviceName") String serviceName); + /** + * Select all leases for cluster and service. + * + * @param clusterName cluster name + * @param serviceName service name + * @return list of lease rows + */ + @Select( + "SELECT cluster_name, service_name, node_id, node_ip, server_info_json, lease_expire_ts, version, updated_at " + + "FROM ha_lease WHERE cluster_name = #{clusterName} AND service_name = #{serviceName}") + @ResultMap("HaLeaseMetaMap") + List selectLeasesByService( + @Param("clusterName") String clusterName, @Param("serviceName") String serviceName); + /** * Select current lease for cluster and service. * diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java b/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java new file mode 100644 index 0000000000..b0ae50fcf7 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.ha; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.amoro.client.AmsServerInfo; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.properties.AmsHAProperties; +import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat; +import org.apache.amoro.utils.JacksonUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** Test for HighAvailabilityContainer using mocked ZK to avoid connection issues. */ +public class TestZkHighAvailabilityContainer { + + private Configurations serviceConfig; + private HighAvailabilityContainer haContainer; + private MockZkState mockZkState; + private CuratorFramework mockZkClient; + private LeaderLatch mockLeaderLatch; + + @Before + public void setUp() throws Exception { + mockZkState = new MockZkState(); + mockZkClient = createMockZkClient(); + mockLeaderLatch = createMockLeaderLatch(); + + // Create test configuration + serviceConfig = new Configurations(); + serviceConfig.setString(AmoroManagementConf.SERVER_EXPOSE_HOST, "127.0.0.1"); + serviceConfig.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, 1260); + serviceConfig.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT, 1261); + serviceConfig.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, 1630); + serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, true); + serviceConfig.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS, "127.0.0.1:2181"); + serviceConfig.setString(AmoroManagementConf.HA_CLUSTER_NAME, "test-cluster"); + } + + @After + public void tearDown() throws Exception { + if (haContainer != null) { + haContainer.close(); + } + mockZkState.clear(); + } + + @Test + public void testRegistAndElectWithoutMasterSlaveMode() throws Exception { + // Test that node registration is skipped when master-slave mode is disabled + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false); + haContainer = createContainerWithMockZk(); + + // Should not throw exception and should not register node + haContainer.registAndElect(); + + // Verify no node was registered + String nodesPath = AmsHAProperties.getNodesPath("test-cluster"); + List children = mockZkState.getChildren(nodesPath); + Assert.assertEquals( + "No nodes should be registered when master-slave mode is disabled", 0, children.size()); + } + + @Test + public void testRegistAndElectWithMasterSlaveMode() throws Exception { + // Test that node registration works when master-slave mode is enabled + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true); + haContainer = createContainerWithMockZk(); + + // Register node + haContainer.registAndElect(); + + // Verify node was registered + String nodesPath = AmsHAProperties.getNodesPath("test-cluster"); + List children = mockZkState.getChildren(nodesPath); + Assert.assertEquals("One node should be registered", 1, children.size()); + + // Verify node data + String nodePath = nodesPath + "/" + children.get(0); + byte[] data = mockZkState.getData(nodePath); + Assert.assertNotNull("Node data should not be null", data); + Assert.assertTrue("Node data should not be empty", data.length > 0); + + // Verify node info + String nodeInfoJson = new String(data, StandardCharsets.UTF_8); + AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, AmsServerInfo.class); + Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost()); + Assert.assertEquals( + "Thrift port should match", Integer.valueOf(1260), nodeInfo.getThriftBindPort()); + } + + @Test + public void testGetAliveNodesWithoutMasterSlaveMode() throws Exception { + // Test that getAliveNodes returns empty list when master-slave mode is disabled + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false); + haContainer = createContainerWithMockZk(); + + List aliveNodes = haContainer.getAliveNodes(); + Assert.assertNotNull("Alive nodes list should not be null", aliveNodes); + Assert.assertEquals( + "Alive nodes list should be empty when master-slave mode is disabled", + 0, + aliveNodes.size()); + } + + @Test + public void testGetAliveNodesWhenNotLeader() throws Exception { + // Test that getAliveNodes returns empty list when not leader + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true); + mockLeaderLatch = createMockLeaderLatch(false); // Not leader + haContainer = createContainerWithMockZk(); + + // Register node + haContainer.registAndElect(); + + // Since we're not the leader, should return empty list + List aliveNodes = haContainer.getAliveNodes(); + Assert.assertNotNull("Alive nodes list should not be null", aliveNodes); + Assert.assertEquals("Alive nodes list should be empty when not leader", 0, aliveNodes.size()); + } + + @Test + public void testGetAliveNodesAsLeader() throws Exception { + // Test that getAliveNodes returns nodes when leader + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true); + mockLeaderLatch = createMockLeaderLatch(true); // Is leader + haContainer = createContainerWithMockZk(); + + // Register node + haContainer.registAndElect(); + + // Verify we are leader + Assert.assertTrue("Should be leader", haContainer.hasLeadership()); + + // Get alive nodes + List aliveNodes = haContainer.getAliveNodes(); + Assert.assertNotNull("Alive nodes list should not be null", aliveNodes); + Assert.assertEquals("Should have one alive node", 1, aliveNodes.size()); + + // Verify node info + AmsServerInfo nodeInfo = aliveNodes.get(0); + Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost()); + Assert.assertEquals( + "Thrift port should match", Integer.valueOf(1260), nodeInfo.getThriftBindPort()); + Assert.assertEquals( + "HTTP port should match", Integer.valueOf(1630), nodeInfo.getRestBindPort()); + } + + @Test + public void testGetAliveNodesWithMultipleNodes() throws Exception { + // Test that getAliveNodes returns all registered nodes + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true); + mockLeaderLatch = createMockLeaderLatch(true); // Is leader + haContainer = createContainerWithMockZk(); + + // Register first node + haContainer.registAndElect(); + + // Verify first node was registered + String nodesPath = AmsHAProperties.getNodesPath("test-cluster"); + List childrenAfterFirst = mockZkState.getChildren(nodesPath); + Assert.assertEquals("First node should be registered", 1, childrenAfterFirst.size()); + + // Register second node manually in mock state + // Use createNode with sequential path to get the correct sequence number + AmsServerInfo nodeInfo2 = new AmsServerInfo(); + nodeInfo2.setHost("127.0.0.2"); + nodeInfo2.setThriftBindPort(1262); + nodeInfo2.setRestBindPort(1631); + String nodeInfo2Json = JacksonUtil.toJSONString(nodeInfo2); + // Use sequential path ending with "-" to let createNode generate the sequence number + // This ensures the second node gets the correct sequence number (0000000001) + mockZkState.createNode(nodesPath + "/node-", nodeInfo2Json.getBytes(StandardCharsets.UTF_8)); + + // Get alive nodes + List aliveNodes = haContainer.getAliveNodes(); + Assert.assertNotNull("Alive nodes list should not be null", aliveNodes); + Assert.assertEquals("Should have two alive nodes", 2, aliveNodes.size()); + } + + @Test + public void testCloseUnregistersNode() throws Exception { + // Test that close() unregisters the node + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true); + haContainer = createContainerWithMockZk(); + + // Register node + haContainer.registAndElect(); + + // Verify node was registered + String nodesPath = AmsHAProperties.getNodesPath("test-cluster"); + List children = mockZkState.getChildren(nodesPath); + Assert.assertEquals("One node should be registered", 1, children.size()); + + // Close container + haContainer.close(); + haContainer = null; + + // Verify node was unregistered + List childrenAfterClose = mockZkState.getChildren(nodesPath); + Assert.assertEquals("No nodes should be registered after close", 0, childrenAfterClose.size()); + } + + @Test + public void testHasLeadership() throws Exception { + // Test hasLeadership() method + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true); + mockLeaderLatch = createMockLeaderLatch(false); // Not leader initially + haContainer = createContainerWithMockZk(); + + // Initially should not be leader + Assert.assertFalse("Should not be leader initially", haContainer.hasLeadership()); + + // Change to leader + mockLeaderLatch = createMockLeaderLatch(true); + haContainer = createContainerWithMockZk(); + + // Should be leader now + Assert.assertTrue("Should be leader", haContainer.hasLeadership()); + } + + @Test + public void testRegistAndElectWithoutHAEnabled() throws Exception { + // Test that registAndElect skips when HA is not enabled + serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false); + serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true); + haContainer = new ZkHighAvailabilityContainer(serviceConfig); + + // Should not throw exception + haContainer.registAndElect(); + } + + /** Create HighAvailabilityContainer with mocked ZK components using reflection. */ + private HighAvailabilityContainer createContainerWithMockZk() throws Exception { + // Create container without ZK connection to avoid any connection attempts + HighAvailabilityContainer container = createContainerWithoutZk(); + + // Inject mock ZK client and leader latch (fields are on ZkHighAvailabilityContainer) + java.lang.reflect.Field zkClientField = + ZkHighAvailabilityContainer.class.getDeclaredField("zkClient"); + zkClientField.setAccessible(true); + zkClientField.set(container, mockZkClient); + + java.lang.reflect.Field leaderLatchField = + ZkHighAvailabilityContainer.class.getDeclaredField("leaderLatch"); + leaderLatchField.setAccessible(true); + leaderLatchField.set(container, mockLeaderLatch); + + // Note: We don't need to create the paths themselves as nodes in ZK + // ZK paths are logical containers, not actual nodes + // The createPathIfNeeded() calls will be handled by the mock when needed + + return container; + } + + /** + * Create a HighAvailabilityContainer without initializing ZK connection. This is used when we + * want to completely avoid ZK connection attempts. + * + *

Uses ZkHighAvailabilityContainer (which has the constructor and fields); + * HighAvailabilityContainer is an interface without constructors or instance fields. + */ + private HighAvailabilityContainer createContainerWithoutZk() throws Exception { + // ZkHighAvailabilityContainer has constructor (Configurations); HighAvailabilityContainer is an + // interface + java.lang.reflect.Constructor constructor = + ZkHighAvailabilityContainer.class.getDeclaredConstructor(Configurations.class); + + // Create a minimal config that disables HA to avoid ZK connection + Configurations tempConfig = new Configurations(serviceConfig); + tempConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false); + + HighAvailabilityContainer container = constructor.newInstance(tempConfig); + + // Now set all required fields using reflection (fields are on ZkHighAvailabilityContainer) + java.lang.reflect.Field isMasterSlaveModeField = + ZkHighAvailabilityContainer.class.getDeclaredField("isMasterSlaveMode"); + isMasterSlaveModeField.setAccessible(true); + isMasterSlaveModeField.set( + container, serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)); + + if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) { + String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME); + + java.lang.reflect.Field tableServiceMasterPathField = + ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceMasterPath"); + tableServiceMasterPathField.setAccessible(true); + tableServiceMasterPathField.set( + container, AmsHAProperties.getTableServiceMasterPath(haClusterName)); + + java.lang.reflect.Field optimizingServiceMasterPathField = + ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceMasterPath"); + optimizingServiceMasterPathField.setAccessible(true); + optimizingServiceMasterPathField.set( + container, AmsHAProperties.getOptimizingServiceMasterPath(haClusterName)); + + java.lang.reflect.Field nodesPathField = + ZkHighAvailabilityContainer.class.getDeclaredField("nodesPath"); + nodesPathField.setAccessible(true); + nodesPathField.set(container, AmsHAProperties.getNodesPath(haClusterName)); + + java.lang.reflect.Field tableServiceServerInfoField = + ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceServerInfo"); + tableServiceServerInfoField.setAccessible(true); + AmsServerInfo tableServiceServerInfo = + buildServerInfo( + serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST), + serviceConfig.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT), + serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT)); + tableServiceServerInfoField.set(container, tableServiceServerInfo); + + java.lang.reflect.Field optimizingServiceServerInfoField = + ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceServerInfo"); + optimizingServiceServerInfoField.setAccessible(true); + AmsServerInfo optimizingServiceServerInfo = + buildServerInfo( + serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST), + serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT), + serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT)); + optimizingServiceServerInfoField.set(container, optimizingServiceServerInfo); + } + + return container; + } + + /** Helper method to build AmsServerInfo (copied from HighAvailabilityContainer). */ + private AmsServerInfo buildServerInfo(String host, Integer thriftPort, Integer httpPort) { + AmsServerInfo serverInfo = new AmsServerInfo(); + serverInfo.setHost(host); + serverInfo.setThriftBindPort(thriftPort); + serverInfo.setRestBindPort(httpPort); + return serverInfo; + } + + /** Create a mock CuratorFramework that uses MockZkState for storage. */ + @SuppressWarnings("unchecked") + private CuratorFramework createMockZkClient() throws Exception { + CuratorFramework mockClient = mock(CuratorFramework.class); + + // Mock getChildren() - create a chain of mocks + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder + getChildrenBuilder = + mock( + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api + .GetChildrenBuilder.class); + when(mockClient.getChildren()).thenReturn(getChildrenBuilder); + when(getChildrenBuilder.forPath(anyString())) + .thenAnswer( + invocation -> { + String path = invocation.getArgument(0); + return mockZkState.getChildren(path); + }); + + // Mock getData() + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder + getDataBuilder = + mock( + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder + .class); + when(mockClient.getData()).thenReturn(getDataBuilder); + when(getDataBuilder.forPath(anyString())) + .thenAnswer( + invocation -> { + String path = invocation.getArgument(0); + return mockZkState.getData(path); + }); + + // Mock create() - manually create the entire fluent API chain to ensure consistency + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder createBuilder = + mock( + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class); + + @SuppressWarnings("unchecked") + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api + .ProtectACLCreateModeStatPathAndBytesable< + String> + pathAndBytesable = + mock( + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api + .ProtectACLCreateModeStatPathAndBytesable.class); + + when(mockClient.create()).thenReturn(createBuilder); + + // Mock the chain: creatingParentsIfNeeded() -> withMode() -> forPath() + // Use the same mock object for the entire chain + when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable); + when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable); + + // Mock forPath(path, data) - used by registAndElect() + when(pathAndBytesable.forPath(anyString(), any(byte[].class))) + .thenAnswer( + invocation -> { + String path = invocation.getArgument(0); + byte[] data = invocation.getArgument(1); + return mockZkState.createNode(path, data); + }); + + // Mock forPath(path) - used by createPathIfNeeded() + // Note: createPathIfNeeded() creates paths without data, but we still need to store them + // so that getChildren() can work correctly + when(pathAndBytesable.forPath(anyString())) + .thenAnswer( + invocation -> { + String path = invocation.getArgument(0); + // Create the path as an empty node (this simulates ZK path creation) + // In real ZK, paths are logical containers, but we need to store them + // to make getChildren() work correctly + if (mockZkState.exists(path) == null) { + mockZkState.createNode(path, new byte[0]); + } + return null; + }); + + // Mock delete() + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder deleteBuilder = + mock( + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class); + when(mockClient.delete()).thenReturn(deleteBuilder); + doAnswer( + invocation -> { + String path = invocation.getArgument(0); + mockZkState.deleteNode(path); + return null; + }) + .when(deleteBuilder) + .forPath(anyString()); + + // Mock checkExists() + @SuppressWarnings("unchecked") + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder + checkExistsBuilder = + mock( + org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder + .class); + when(mockClient.checkExists()).thenReturn(checkExistsBuilder); + when(checkExistsBuilder.forPath(anyString())) + .thenAnswer( + invocation -> { + String path = invocation.getArgument(0); + return mockZkState.exists(path); + }); + + // Mock start() and close() + doAnswer(invocation -> null).when(mockClient).start(); + doAnswer(invocation -> null).when(mockClient).close(); + + return mockClient; + } + + /** Create a mock LeaderLatch. */ + private LeaderLatch createMockLeaderLatch() throws Exception { + return createMockLeaderLatch(true); + } + + /** Create a mock LeaderLatch with specified leadership status. */ + private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws Exception { + LeaderLatch mockLatch = mock(LeaderLatch.class); + when(mockLatch.hasLeadership()).thenReturn(hasLeadership); + doAnswer(invocation -> null).when(mockLatch).addListener(any()); + doAnswer(invocation -> null).when(mockLatch).start(); + doAnswer(invocation -> null).when(mockLatch).close(); + // Mock await() - it throws IOException and InterruptedException + doAnswer( + invocation -> { + // Mock implementation - doesn't actually wait + return null; + }) + .when(mockLatch) + .await(); + return mockLatch; + } + + /** In-memory ZK state simulator. */ + private static class MockZkState { + private final Map nodes = new HashMap<>(); + private final AtomicInteger sequenceCounter = new AtomicInteger(0); + + public List getChildren(String path) throws KeeperException { + List children = new ArrayList<>(); + String prefix = path.endsWith("/") ? path : path + "/"; + for (String nodePath : nodes.keySet()) { + // Only include direct children (not the path itself, and not nested paths) + if (nodePath.startsWith(prefix) && !nodePath.equals(path)) { + String relativePath = nodePath.substring(prefix.length()); + // Only add direct children (no additional slashes) + // This means the path should be exactly: prefix + relativePath + if (!relativePath.contains("/")) { + children.add(relativePath); + } + } + } + // Sort to ensure consistent ordering + children.sort(String::compareTo); + return children; + } + + public byte[] getData(String path) throws KeeperException { + byte[] data = nodes.get(path); + if (data == null) { + throw new KeeperException.NoNodeException(path); + } + return data; + } + + public String createNode(String path, byte[] data) { + // Handle sequential nodes + if (path.endsWith("-")) { + int seq = sequenceCounter.incrementAndGet(); + path = path + String.format("%010d", seq); + } + nodes.put(path, data); + return path; + } + + public void deleteNode(String path) throws KeeperException { + if (!nodes.containsKey(path)) { + throw new KeeperException.NoNodeException(path); + } + nodes.remove(path); + } + + public Stat exists(String path) { + return nodes.containsKey(path) ? new Stat() : null; + } + + public void clear() { + nodes.clear(); + sequenceCounter.set(0); + } + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java index e794b520da..08b9ef04a4 100644 --- a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java +++ b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java @@ -25,6 +25,7 @@ public class AmsHAProperties { private static final String LEADER_PATH = "/leader"; private static final String TABLE_SERVICE_MASTER_PATH = "/master"; private static final String OPTIMIZING_SERVICE_MASTER_PATH = "/optimizing-service-master"; + private static final String NODES_PATH = "/nodes"; private static final String NAMESPACE_DEFAULT = "default"; private static String getBasePath(String namespace) { @@ -45,4 +46,8 @@ public static String getOptimizingServiceMasterPath(String namespace) { public static String getLeaderPath(String namespace) { return getBasePath(namespace) + LEADER_PATH; } + + public static String getNodesPath(String namespace) { + return getBasePath(namespace) + NODES_PATH; + } }