diff --git a/lib/charms/opensearch/v0/helper_cluster.py b/lib/charms/opensearch/v0/helper_cluster.py index be51f96e4..58bf0dbab 100644 --- a/lib/charms/opensearch/v0/helper_cluster.py +++ b/lib/charms/opensearch/v0/helper_cluster.py @@ -3,7 +3,6 @@ """Utility classes and methods for getting cluster info, configuration info and suggestions.""" import logging -from random import choice from typing import Dict, List, Optional from charms.opensearch.v0.helper_enums import BaseStrEnum @@ -50,12 +49,11 @@ def suggest_roles(nodes: List[Node], planned_units: int) -> List[str]: max_cms = ClusterTopology.max_cluster_manager_nodes(planned_units) base_roles = ["data", "ingest", "ml", "coordinating_only"] - + full_roles = base_roles + ["cluster_manager"] nodes_by_roles = ClusterTopology.nodes_count_by_role(nodes) if nodes_by_roles.get("cluster_manager", 0) == max_cms: return base_roles - - return base_roles + ["cluster_manager"] + return full_roles @staticmethod def get_cluster_settings( @@ -77,106 +75,39 @@ def get_cluster_settings( @staticmethod def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]: """Recompute the configuration of all the nodes (cluster set to auto-generate roles).""" - # in case the cluster nodes' roles were previously "manually generated" - we need - # to reset the roles to their expected default values so that "roles re-balancing" - # logic (node_with_new_roles) can be safely applied: only change the last node. - # Nothing changes to the conf of the nodes if the roles were previously auto-generated. - valid_nodes = ClusterTopology.refill_node_with_default_roles(app_name, nodes) - - nodes_by_name = dict([(node.name, node) for node in nodes]) - - # compute node with new roles - only returns 1 changed node - updated_node = ClusterTopology.node_with_new_roles(app_name, valid_nodes) - if updated_node: - nodes_by_name[updated_node.name] = updated_node - - return nodes_by_name - - @staticmethod - def refill_node_with_default_roles(app_name: str, nodes: List[Node]) -> List[Node]: - """Refill the roles of a list of nodes with default values for re-computing. - - This method works hand in hand with node_with_new_roles which assumes a clean - base (regarding the auto-generation logic) and only applies changes to 1 node. - """ + if not nodes: + return {} + logger.debug(f"Roles before re-balancing {({node.name: node.roles for node in nodes})=}") + nodes_by_name = {} + current_cluster_nodes = [] + for node in nodes: + if node.app_name == app_name: + current_cluster_nodes.append(node) + else: + # Leave node unchanged + nodes_by_name[node.name] = node base_roles = ["data", "ingest", "ml", "coordinating_only"] full_roles = base_roles + ["cluster_manager"] - - current_cluster_nodes = [node for node in nodes if node.app_name == app_name] - current_cm_eligible = [node for node in current_cluster_nodes if node.is_cm_eligible()] - - # we check if the roles were previously balanced in accordance with the auto-generation - # logic, in which the max difference between CMs and Non-CMs is 1 node (to keep quorum) - unbalanced = len(current_cm_eligible) < len(current_cluster_nodes) - 1 - - updated_nodes = [] + highest_unit_number = max(node.unit_number for node in current_cluster_nodes) for node in current_cluster_nodes: # we do this in order to remove any non-default role / add any missing default role - new_roles = full_roles if unbalanced or node.is_cm_eligible() else base_roles - updated = Node( + if len(current_cluster_nodes) % 2 == 0 and node.unit_number == highest_unit_number: + roles = base_roles + else: + roles = full_roles + + nodes_by_name[node.name] = Node( name=node.name, - roles=new_roles, + roles=roles, ip=node.ip, app_name=node.app_name, + unit_number=node.unit_number, temperature=node.temperature, ) - updated_nodes.append(updated) - - return updated_nodes + [node for node in nodes if node.app_name != app_name] - - @staticmethod - def node_with_new_roles(app_name: str, remaining_nodes: List[Node]) -> Optional[Node]: - """Pick and recompute the roles of the best node to re-balance the cluster. - - Args: - app_name: Name of the (current) cluster's app on which the node changes must happen - Important to have this piece of information, as in a multi-cluster - deployments the "remaining nodes" includes nodes from all the fleet. - remaining_nodes: List of nodes remaining in a cluster (sub-cluster or full-fleet) - """ - max_cms = ClusterTopology.max_cluster_manager_nodes(len(remaining_nodes)) - - nodes_by_roles = ClusterTopology.nodes_by_role(remaining_nodes) - nodes_count_by_roles = ClusterTopology.nodes_count_by_role(remaining_nodes) - current_cms = nodes_count_by_roles.get("cluster_manager", 0) - - # the nodes involved in the voting are intact, do nothing - if current_cms == max_cms: - logger.debug("Suggesting NO changes to the nodes.") - return None - - if current_cms > max_cms: - # remove cm from a node - cm = choice( - [node for node in nodes_by_roles["cluster_manager"] if node.app_name == app_name] - ) - logger.debug(f"Suggesting - removal of 'CM': {cm.name}") - return Node( - name=cm.name, - roles=[r for r in cm.roles if r != "cluster_manager"], - ip=cm.ip, - app_name=app_name, - ) - - # when cm count smaller than expected - data_only_nodes = [ - node for node in nodes_by_roles["data"] if "cluster_manager" not in node.roles - ] - - # no data-only node available to change, leave - if not data_only_nodes: - logger.debug("Suggesting NO changes to the nodes.") - return None - - # add cm to a data only (non cm) node - data = choice([node for node in data_only_nodes if node.app_name == app_name]) - logger.debug(f"Suggesting - Addition of 'CM' to data: {data.name}") - return Node( - name=data.name, - roles=data.roles + ["cluster_manager"], - ip=data.ip, - app_name=app_name, + logger.debug( + f"Roles after re-balancing {({name: node.roles for name, node in nodes_by_name.items()})=}" ) + return nodes_by_name @staticmethod def max_cluster_manager_nodes(planned_units) -> int: @@ -256,6 +187,7 @@ def nodes( roles=obj["roles"], ip=obj["ip"], app_name="-".join(obj["name"].split("-")[:-1]), + unit_number=int(obj["name"].split("-")[-1]), temperature=obj.get("attributes", {}).get("temp"), ) nodes.append(node) diff --git a/lib/charms/opensearch/v0/models.py b/lib/charms/opensearch/v0/models.py index 0b6bf61f0..487d92ce7 100644 --- a/lib/charms/opensearch/v0/models.py +++ b/lib/charms/opensearch/v0/models.py @@ -66,6 +66,7 @@ class Node(Model): roles: List[str] ip: str app_name: str + unit_number: int temperature: Optional[str] = None @classmethod diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index 92635dcaf..62e822b50 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -1200,6 +1200,7 @@ def _compute_and_broadcast_updated_topology(self, current_nodes: List[Node]) -> roles=roles, ip=node.ip, app_name=node.app_name, + unit_number=self.unit_id, temperature=temperature, ) @@ -1302,7 +1303,7 @@ def unit_name(self) -> str: @property def unit_id(self) -> int: """ID of the current unit.""" - return int(self.unit.name.split("/")[1]) + return int(self.unit.name.split("/")[-1]) @property def alt_hosts(self) -> Optional[List[str]]: diff --git a/lib/charms/opensearch/v0/opensearch_distro.py b/lib/charms/opensearch/v0/opensearch_distro.py index 812a00af8..02310e150 100644 --- a/lib/charms/opensearch/v0/opensearch_distro.py +++ b/lib/charms/opensearch/v0/opensearch_distro.py @@ -412,6 +412,7 @@ def current(self) -> Node: roles=current_node["roles"], ip=current_node["ip"], app_name=self._charm.app.name, + unit_number=self._charm.unit_id, temperature=current_node.get("attributes", {}).get("temp"), ) except OpenSearchHttpError: @@ -422,6 +423,7 @@ def current(self) -> Node: roles=conf_on_disk["node.roles"], ip=self._charm.unit_ip, app_name=self._charm.app.name, + unit_number=self._charm.unit_id, temperature=conf_on_disk.get("node.attr.temp"), ) diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py index 23629d133..d50ccc12c 100644 --- a/tests/integration/ha/helpers.py +++ b/tests/integration/ha/helpers.py @@ -205,6 +205,7 @@ async def all_nodes(ops_test: OpsTest, unit_ip: str) -> List[Node]: roles=node["roles"], ip=node["ip"], app_name="-".join(node["name"].split("-")[:-1]), + unit_number=int(node["name"].split("-")[-1]), temperature=node.get("attributes", {}).get("temp"), ) ) diff --git a/tests/unit/lib/test_helper_cluster.py b/tests/unit/lib/test_helper_cluster.py index 93eca237d..6f93dc13f 100644 --- a/tests/unit/lib/test_helper_cluster.py +++ b/tests/unit/lib/test_helper_cluster.py @@ -22,18 +22,56 @@ class TestHelperCluster(unittest.TestCase): def cluster1_5_nodes_conf(self) -> List[Node]: """Returns the expected config of a 5 "planned" nodes cluster.""" return [ - Node(name="cm1", roles=self.cm_roles, ip="0.0.0.1", app_name=self.cluster1), - Node(name="cm2", roles=self.cm_roles, ip="0.0.0.2", app_name=self.cluster1), - Node(name="cm3", roles=self.cm_roles, ip="0.0.0.3", app_name=self.cluster1), - Node(name="cm4", roles=self.cm_roles, ip="0.0.0.4", app_name=self.cluster1), - Node(name="cm5", roles=self.cm_roles, ip="0.0.0.5", app_name=self.cluster1), + Node( + name="cm1", + roles=self.cm_roles, + ip="0.0.0.1", + app_name=self.cluster1, + unit_number=0, + ), + Node( + name="cm2", + roles=self.cm_roles, + ip="0.0.0.2", + app_name=self.cluster1, + unit_number=1, + ), + # Unit number 2 omitted on purpose + # (unit numbers are not guaranteed to be sequential on VM charms) + Node( + name="cm3", + roles=self.cm_roles, + ip="0.0.0.3", + app_name=self.cluster1, + unit_number=3, + ), + Node( + name="cm4", + roles=self.cm_roles, + ip="0.0.0.4", + app_name=self.cluster1, + unit_number=4, + ), + Node( + name="cm5", + roles=self.cm_roles, + ip="0.0.0.5", + app_name=self.cluster1, + unit_number=5, + ), ] def cluster1_6_nodes_conf(self): """Returns the expected config of a 6 "planned" nodes cluster.""" nodes = self.cluster1_5_nodes_conf() nodes.append( - Node(name="data1", roles=self.base_roles, ip="0.0.0.6", app_name=self.cluster1) + Node( + name="data1", + roles=self.base_roles, + ip="0.0.0.6", + app_name=self.cluster1, + unit_number=6, + ) ) return nodes @@ -41,11 +79,41 @@ def cluster2_nodes_conf(self) -> List[Node]: """Returns the expected config of the sub-cluster 2.""" roles = ["cluster_manager", "data", "ml"] return [ - Node(name="cm_data_ml1", roles=roles, ip="0.0.0.11", app_name=self.cluster2), - Node(name="cm_data_ml2", roles=roles, ip="0.0.0.12", app_name=self.cluster2), - Node(name="cm_data_ml3", roles=roles, ip="0.0.0.13", app_name=self.cluster2), - Node(name="cm_data_ml4", roles=roles, ip="0.0.0.14", app_name=self.cluster2), - Node(name="cm_data_ml5", roles=roles, ip="0.0.0.15", app_name=self.cluster2), + Node( + name="cm_data_ml1", + roles=roles, + ip="0.0.0.11", + app_name=self.cluster2, + unit_number=0, + ), + Node( + name="cm_data_ml2", + roles=roles, + ip="0.0.0.12", + app_name=self.cluster2, + unit_number=1, + ), + Node( + name="cm_data_ml3", + roles=roles, + ip="0.0.0.13", + app_name=self.cluster2, + unit_number=2, + ), + Node( + name="cm_data_ml4", + roles=roles, + ip="0.0.0.14", + app_name=self.cluster2, + unit_number=3, + ), + Node( + name="cm_data_ml5", + roles=roles, + ip="0.0.0.15", + app_name=self.cluster2, + unit_number=4, + ), ] def setUp(self) -> None: @@ -82,38 +150,50 @@ def test_topology_roles_suggestion_even_number_of_planned_units(self): ) self.assertCountEqual( - ClusterTopology.suggest_roles(cluster_6_conf[:-1], planned_units), self.base_roles + ClusterTopology.suggest_roles(cluster_6_conf[:-1], planned_units), + self.base_roles, ) def test_auto_recompute_node_roles_in_cluster_6(self): """Test the automatic suggestion of new roles to an existing node.""" - cluster_conf = self.cluster1_6_nodes_conf() + cluster_conf = {node.name: node for node in self.cluster1_6_nodes_conf()} # remove a cluster manager node - computed_node_to_change = ClusterTopology.node_with_new_roles( - app_name=self.cluster1, - remaining_nodes=[node for node in cluster_conf if node.name != "cm1"], + old_cluster_conf = cluster_conf.copy() + old_cluster_conf.pop("cm1") + new_cluster_conf = ClusterTopology.recompute_nodes_conf( + app_name=self.cluster1, nodes=list(old_cluster_conf.values()) ) - self.assertEqual(computed_node_to_change.name, "data1") - self.assertCountEqual(computed_node_to_change.roles, self.cm_roles) + assert new_cluster_conf["data1"].roles == self.cm_roles + # Assert other remaining nodes unchanged + old_cluster_conf.pop("data1") + new_cluster_conf.pop("data1") + assert old_cluster_conf == new_cluster_conf # remove a data node - computed_node_to_change = ClusterTopology.node_with_new_roles( - app_name=self.cluster1, - remaining_nodes=[node for node in cluster_conf if node.name != "data1"], + old_cluster_conf = cluster_conf.copy() + old_cluster_conf.pop("data1") + new_cluster_conf = ClusterTopology.recompute_nodes_conf( + app_name=self.cluster1, nodes=list(old_cluster_conf.values()) ) - self.assertIsNone(computed_node_to_change) + # Assert all remaining nodes unchanged + assert old_cluster_conf == new_cluster_conf def test_auto_recompute_node_roles_in_cluster_5(self): """Test the automatic suggestion of new roles to an existing node.""" - cluster_conf = self.cluster1_5_nodes_conf() + cluster_conf = {node.name: node for node in self.cluster1_5_nodes_conf()} # remove a cluster manager node - computed_node_to_change = ClusterTopology.node_with_new_roles( - app_name=self.cluster1, - remaining_nodes=[node for node in cluster_conf if node.name != "cm1"], + old_cluster_conf = cluster_conf.copy() + old_cluster_conf.pop("cm1") + new_cluster_conf = ClusterTopology.recompute_nodes_conf( + app_name=self.cluster1, nodes=list(old_cluster_conf.values()) ) - self.assertCountEqual(computed_node_to_change.roles, self.base_roles) + assert new_cluster_conf["cm5"].roles == self.base_roles + # Assert other remaining nodes unchanged + old_cluster_conf.pop("cm5") + new_cluster_conf.pop("cm5") + assert old_cluster_conf == new_cluster_conf def test_auto_recompute_node_roles_in_previous_non_auto_gen_cluster(self): """Test the automatic suggestion of new roles to an existing node.""" @@ -126,7 +206,6 @@ def test_auto_recompute_node_roles_in_previous_non_auto_gen_cluster(self): new_node.roles.append("custom_role") first_cluster_nodes.append(new_node) - # remove a cluster manager node computed_node_to_change = ClusterTopology.recompute_nodes_conf( app_name=self.cluster2, nodes=cluster_conf + first_cluster_nodes, @@ -139,6 +218,7 @@ def test_auto_recompute_node_roles_in_previous_non_auto_gen_cluster(self): roles=self.cm_roles, ip=node.ip, app_name=node.app_name, + unit_number=node.unit_number, temperature=node.temperature, ) self.assertCountEqual(computed_node_to_change, expected) @@ -157,55 +237,6 @@ def test_topology_get_cluster_managers_names(self): ["cm1", "cm2", "cm3", "cm4", "cm5"], ) - def test_topology_nodes_count_by_role(self): - """Test correct mapping role / count of nodes with the role.""" - self.assertDictEqual( - ClusterTopology.nodes_count_by_role(self.cluster1_6_nodes_conf()), - { - "cluster_manager": 5, - "coordinating_only": 6, - "data": 6, - "ingest": 6, - "ml": 6, - }, - ) - - def test_refill_node_with_default_roles(self): - """Test the automatic suggestion of new roles to an existing node.""" - # First test with previously set roles in a cluster - cluster2_nodes = self.cluster2_nodes_conf() - - expected = [] - for node in cluster2_nodes: - expected.append( - Node( - name=node.name, - roles=self.cm_roles, - ip=node.ip, - app_name=node.app_name, - temperature=node.temperature, - ) - ) - expected.sort(key=lambda node: node.name) - refilled = ClusterTopology.refill_node_with_default_roles( - app_name=self.cluster2, - nodes=cluster2_nodes, - ) - refilled.sort(key=lambda node: node.name) - for index in range(len(refilled)): - self.assertEqual(refilled[index], expected[index]) - - # test on auto-gen roles: expected no changes - expected = self.cluster1_5_nodes_conf() - expected.sort(key=lambda node: node.name) - refilled = ClusterTopology.refill_node_with_default_roles( - app_name=self.cluster1, - nodes=self.cluster1_5_nodes_conf(), - ) - refilled.sort(key=lambda node: node.name) - for index in range(len(refilled)): - self.assertEqual(refilled[index], expected[index]) - @patch("charms.opensearch.v0.helper_cluster.ClusterState.shards") def test_state_busy_shards_by_unit(self, shards): """Test the busy shards filtering.""" @@ -227,7 +258,11 @@ def test_state_busy_shards_by_unit(self, shards): def test_node_obj_creation_from_json(self): """Test the creation of a Node object from a dict representation.""" raw_node = Node( - name="cm1", roles=["cluster_manager"], ip="0.0.0.11", app_name=self.cluster1 + name="cm1", + roles=["cluster_manager"], + ip="0.0.0.11", + app_name=self.cluster1, + unit_number=0, ) from_json_node = Node.from_dict( { @@ -235,6 +270,7 @@ def test_node_obj_creation_from_json(self): "roles": ["cluster_manager"], "ip": "0.0.0.11", "app_name": self.cluster1, + "unit_number": 0, } ) diff --git a/tests/unit/lib/test_opensearch_base_charm.py b/tests/unit/lib/test_opensearch_base_charm.py index 78b1f1800..917fd76e5 100644 --- a/tests/unit/lib/test_opensearch_base_charm.py +++ b/tests/unit/lib/test_opensearch_base_charm.py @@ -69,6 +69,7 @@ def setUp(self) -> None: roles=["cluster_manager", "data"], ip="1.1.1.1", app_name="opensearch-ff2z", + unit_number=3, ) self.opensearch.is_failed = MagicMock() self.opensearch.is_failed.return_value = False diff --git a/tests/unit/lib/test_opensearch_peer_clusters.py b/tests/unit/lib/test_opensearch_peer_clusters.py index 66aad09ec..18f6403b4 100644 --- a/tests/unit/lib/test_opensearch_peer_clusters.py +++ b/tests/unit/lib/test_opensearch_peer_clusters.py @@ -123,6 +123,7 @@ def test_validate_roles( roles=["cluster_manager", "data"], ip="1.1.1.1", app_name="logs", + unit_number=int(node.name.split("/")[-1]), ) for node in self.p_units[0:3] ] @@ -137,9 +138,10 @@ def test_validate_roles( roles=["cluster_manager", "data"], ip="1.1.1.1", app_name="logs", + unit_number=int(node.name.split("/")[-1]), ) for node in self.p_units[0:4] - ] + [Node(name="node", roles=["ml"], ip="0.0.0.0", app_name="logs")] + ] + [Node(name="node", roles=["ml"], ip="0.0.0.0", app_name="logs", unit_number=7)] self.peer_cm.validate_roles(nodes=nodes, on_new_unit=False) @patch("ops.model.Model.get_relation") @@ -167,9 +169,10 @@ def test_pre_validate_roles_change( roles=["data"], ip="1.1.1.1", app_name="logs", + unit_number=int(node.name.split("/")[-1]), ) for node in self.p_units - ] + [Node(name="node-5", roles=["data"], ip="2.2.2.2", app_name="logs")] + ] + [Node(name="node-5", roles=["data"], ip="2.2.2.2", app_name="logs", unit_number=5)] self.peer_cm._pre_validate_roles_change(new_roles=["ml"], prev_roles=["data", "ml"]) except OpenSearchProvidedRolesException: self.fail("_pre_validate_roles_change() failed unexpectedly.") @@ -194,6 +197,7 @@ def test_pre_validate_roles_change( roles=["data"], ip="1.1.1.1", app_name="logs", + unit_number=int(node.name.split("/")[-1]), ) for node in self.p_units ]