Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -135,6 +137,44 @@ public void waitFollowerShip() throws InterruptedException {
LOG.info("Became the follower of AMS (Database lease)");
}

@Override
public void registAndElect() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method name charge to registerAndElect

boolean isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
if (!isMasterSlaveMode) {
LOG.debug("Master-slave mode is not enabled, skip node registration");
return;
}
// In master-slave mode, register node to database by writing OPTIMIZING_SERVICE info
// This is similar to ZK mode registering ephemeral nodes
long now = System.currentTimeMillis();
String optimizingInfoJson = JacksonUtil.toJSONString(optimizingServiceServerInfo);
try {
doAsIgnoreError(
HaLeaseMapper.class,
mapper -> {
int updated =
mapper.updateServerInfo(
clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
if (updated == 0) {
mapper.insertServerInfoIfAbsent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not need to lease_expire_ts to first register time?

clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
}
});
LOG.info(
"Registered AMS node to database: nodeId={}, optimizingService={}",
nodeId,
optimizingServiceServerInfo);
} catch (Exception e) {
LOG.error("Failed to register node to database", e);
throw e;
}
}

@Override
public boolean hasLeadership() {
return isLeader.get();
}

/** Closes the heartbeat executor safely. */
@Override
public void close() {
Expand All @@ -147,9 +187,6 @@ public void close() {
}
}

@Override
public void registAndElect() throws Exception {}

private class HeartbeatRunnable implements Runnable {
@Override
public void run() {
Expand Down Expand Up @@ -304,6 +341,40 @@ private void onLeaderLost() {
}
}

@Override
public List<AmsServerInfo> getAliveNodes() {
List<AmsServerInfo> aliveNodes = new ArrayList<>();
if (!isLeader.get()) {
LOG.warn("Only leader node can get alive nodes list");
return aliveNodes;
}
try {
long currentTime = System.currentTimeMillis();
List<HaLeaseMeta> leases =
getAs(
HaLeaseMapper.class,
mapper -> mapper.selectLeasesByService(clusterName, OPTIMIZING_SERVICE));
for (HaLeaseMeta lease : leases) {
// Only include nodes with valid (non-expired) leases
if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > currentTime) {
if (lease.getServerInfoJson() != null && !lease.getServerInfoJson().isEmpty()) {
try {
AmsServerInfo nodeInfo =
JacksonUtil.parseObject(lease.getServerInfoJson(), AmsServerInfo.class);
aliveNodes.add(nodeInfo);
} catch (Exception e) {
LOG.warn("Failed to parse server info for node {}", lease.getNodeId(), e);
}
}
}
}
} catch (Exception e) {
LOG.error("Failed to get alive nodes from database", e);
throw e;
}
return aliveNodes;
}

private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restBindPort) {
AmsServerInfo amsServerInfo = new AmsServerInfo();
amsServerInfo.setHost(host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.amoro.server.ha;

import org.apache.amoro.client.AmsServerInfo;

import java.util.List;

/**
* Common interface for high availability (HA) containers.
*
Expand Down Expand Up @@ -50,4 +54,18 @@ public interface HighAvailabilityContainer {
* @throws Exception If registration fails or participation in the primary election fails.
*/
void registAndElect() throws Exception;

/**
* Used in master-slave mode to obtain information about all currently registered AMS nodes.
*
* @return List<AmsServerInfo>
*/
List<AmsServerInfo> getAliveNodes();

/**
* Used to determine whether the current AMS node is the primary node.
*
* @return
*/
boolean hasLeadership();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.amoro.server.ha;

import org.apache.amoro.client.AmsServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/** No-op HA container that never blocks and performs no leader election. */
Expand Down Expand Up @@ -49,4 +51,14 @@ public void close() {

@Override
public void registAndElect() throws Exception {}

@Override
public List<AmsServerInfo> getAliveNodes() {
return List.of();
}

@Override
public boolean hasLeadership() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

Expand All @@ -55,13 +57,27 @@ public class ZkHighAvailabilityContainer implements HighAvailabilityContainer, L

private final LeaderLatch leaderLatch;
private final CuratorFramework zkClient;

// Package-private accessors for testing
CuratorFramework getZkClient() {
return zkClient;
}

LeaderLatch getLeaderLatch() {
return leaderLatch;
}

private final String tableServiceMasterPath;
private final String optimizingServiceMasterPath;
private final String nodesPath;
private final AmsServerInfo tableServiceServerInfo;
private final AmsServerInfo optimizingServiceServerInfo;
private final boolean isMasterSlaveMode;
private volatile CountDownLatch followerLatch;
private String registeredNodePath;

public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exception {
this.isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
String zkServerAddress = serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS);
int zkSessionTimeout =
Expand All @@ -71,6 +87,7 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio
String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(haClusterName);
optimizingServiceMasterPath = AmsHAProperties.getOptimizingServiceMasterPath(haClusterName);
nodesPath = AmsHAProperties.getNodesPath(haClusterName);
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
setupZookeeperAuth(serviceConfig);
this.zkClient =
Expand All @@ -83,6 +100,7 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio
zkClient.start();
createPathIfNeeded(tableServiceMasterPath);
createPathIfNeeded(optimizingServiceMasterPath);
createPathIfNeeded(nodesPath);
String leaderPath = AmsHAProperties.getLeaderPath(haClusterName);
createPathIfNeeded(leaderPath);
leaderLatch = new LeaderLatch(zkClient, leaderPath);
Expand All @@ -103,8 +121,10 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio
zkClient = null;
tableServiceMasterPath = null;
optimizingServiceMasterPath = null;
nodesPath = null;
tableServiceServerInfo = null;
optimizingServiceServerInfo = null;
registeredNodePath = null;
// block follower latch forever when ha is disabled
followerLatch = new CountDownLatch(1);
}
Expand Down Expand Up @@ -142,7 +162,24 @@ public void waitLeaderShip() throws Exception {

@Override
public void registAndElect() throws Exception {
// TODO Here you can register for AMS and participate in the election.
if (!isMasterSlaveMode) {
LOG.debug("Master-slave mode is not enabled, skip node registration");
return;
}
if (zkClient == null || nodesPath == null) {
LOG.warn("HA is not enabled, skip node registration");
return;
}
// Register node to ZK using ephemeral node
// The node will be automatically deleted when the session expires
String nodeInfo = JacksonUtil.toJSONString(tableServiceServerInfo);
registeredNodePath =
zkClient
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(nodesPath + "/node-", nodeInfo.getBytes(StandardCharsets.UTF_8));
LOG.info("Registered AMS node to ZK: {}", registeredNodePath);
}

@Override
Expand All @@ -158,6 +195,18 @@ public void waitFollowerShip() throws Exception {
public void close() {
if (leaderLatch != null) {
try {
// Unregister node from ZK
if (registeredNodePath != null) {
try {
zkClient.delete().forPath(registeredNodePath);
LOG.info("Unregistered AMS node from ZK: {}", registeredNodePath);
} catch (KeeperException.NoNodeException e) {
// Node already deleted, ignore
LOG.debug("Node {} already deleted", registeredNodePath);
} catch (Exception e) {
LOG.warn("Failed to unregister node from ZK: {}", registeredNodePath, e);
}
}
this.leaderLatch.close();
this.zkClient.close();
} catch (IOException e) {
Expand Down Expand Up @@ -192,6 +241,60 @@ private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restB
return amsServerInfo;
}

/**
* Get list of alive nodes. Only the leader node can call this method.
*
* @return List of alive node information
*/
public List<AmsServerInfo> getAliveNodes() {
List<AmsServerInfo> aliveNodes = new ArrayList<>();
if (!isMasterSlaveMode) {
LOG.debug("Master-slave mode is not enabled, return empty node list");
return aliveNodes;
}
if (zkClient == null || nodesPath == null) {
LOG.warn("HA is not enabled, return empty node list");
return aliveNodes;
}
if (!leaderLatch.hasLeadership()) {
LOG.warn("Only leader node can get alive nodes list");
return aliveNodes;
}
try {
List<String> nodePaths = zkClient.getChildren().forPath(nodesPath);
for (String nodePath : nodePaths) {
try {
String fullPath = nodesPath + "/" + nodePath;
byte[] data = zkClient.getData().forPath(fullPath);
if (data != null && data.length > 0) {
String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, AmsServerInfo.class);
aliveNodes.add(nodeInfo);
}
} catch (Exception e) {
LOG.warn("Failed to get node info for path: {}", nodePath, e);
}
}
} catch (KeeperException.NoNodeException e) {
LOG.debug("Nodes path {} does not exist", nodesPath);
} catch (Exception e) {
throw new RuntimeException(e);
}
return aliveNodes;
}

/**
* Check if current node is the leader.
*
* @return true if current node is the leader, false otherwise
*/
public boolean hasLeadership() {
if (leaderLatch == null) {
return false;
}
return leaderLatch.hasLeadership();
}

private void createPathIfNeeded(String path) throws Exception {
try {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ int renewLease(
HaLeaseMeta selectLease(
@Param("clusterName") String clusterName, @Param("serviceName") String serviceName);

/**
* Select all leases for cluster and service.
*
* @param clusterName cluster name
* @param serviceName service name
* @return list of lease rows
*/
@Select(
"SELECT cluster_name, service_name, node_id, node_ip, server_info_json, lease_expire_ts, version, updated_at "
+ "FROM ha_lease WHERE cluster_name = #{clusterName} AND service_name = #{serviceName}")
@ResultMap("HaLeaseMetaMap")
List<HaLeaseMeta> selectLeasesByService(
@Param("clusterName") String clusterName, @Param("serviceName") String serviceName);

/**
* Select current lease for cluster and service.
*
Expand Down
Loading