diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index 147801ce9..c8967470e 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -134,7 +134,7 @@ def wait_until_mysql_connection(self) -> None: # Increment this major API version when introducing breaking changes LIBAPI = 0 -LIBPATCH = 70 +LIBPATCH = 72 UNIT_TEARDOWN_LOCKNAME = "unit-teardown" UNIT_ADD_LOCKNAME = "unit-add" @@ -147,6 +147,7 @@ def wait_until_mysql_connection(self) -> None: GET_MEMBER_STATE_TIME = 10 # seconds MAX_CONNECTIONS_FLOOR = 10 MIM_MEM_BUFFERS = 200 * BYTES_1MiB +ADMIN_PORT = 33062 SECRET_INTERNAL_LABEL = "secret-id" SECRET_DELETED_LABEL = "None" @@ -882,6 +883,30 @@ def __init__( self.backups_password, ] + def instance_def(self, user: str, host: Optional[str] = None) -> str: + """Return instance definition used on mysqlsh. + + Args: + user: User name. + host: Host name, default to unit address. + """ + password_map = { + self.server_config_user: self.server_config_password, + self.cluster_admin_user: self.cluster_admin_password, + "root": self.root_password, + self.backups_user: self.backups_password, + } + if host and ":" in host: + # strip port from address + host = host.split(":")[0] + + if user in (self.server_config_user, self.backups_user): + # critical operator users use admin address + return f"{user}:{password_map[user]}@{host or self.instance_address}:{ADMIN_PORT}" + elif host != self.instance_address: + return f"{user}:{password_map[user]}@{host}:3306" + return f"{user}:{password_map[user]}@{self.socket_uri}" + def render_mysqld_configuration( # noqa: C901 self, *, @@ -948,6 +973,7 @@ def render_mysqld_configuration( # noqa: C901 config["mysqld"] = { "bind-address": "0.0.0.0", "mysqlx-bind-address": "0.0.0.0", + "admin_address": self.instance_address, "report_host": self.instance_address, "max_connections": str(max_connections), "innodb_buffer_pool_size": str(innodb_buffer_pool_size), @@ -1151,13 +1177,13 @@ def configure_mysqlrouter_user( ) # Using server_config_user as we are sure it has create user grants create_mysqlrouter_user_commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", f"session.run_sql(\"CREATE USER '{username}'@'{hostname}' IDENTIFIED BY '{password}' ATTRIBUTE '{escaped_mysqlrouter_user_attributes}';\")", ) # Using server_config_user as we are sure it has create user grants mysqlrouter_user_grant_commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", f"session.run_sql(\"GRANT CREATE USER ON *.* TO '{username}'@'{hostname}' WITH GRANT OPTION;\")", f"session.run_sql(\"GRANT SELECT, INSERT, UPDATE, DELETE, EXECUTE ON mysql_innodb_cluster_metadata.* TO '{username}'@'{hostname}';\")", f"session.run_sql(\"GRANT SELECT ON mysql.user TO '{username}'@'{hostname}';\")", @@ -1191,7 +1217,7 @@ def create_application_database_and_scoped_user( try: # Using server_config_user as we are sure it has create database grants connect_command = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", ) create_database_commands = ( f'session.run_sql("CREATE DATABASE IF NOT EXISTS `{database_name}`;")', @@ -1228,7 +1254,11 @@ def _get_statements_to_delete_users_with_attribute( (e.g. "'bar'") """ return [ - f"session.run_sql(\"SELECT IFNULL(CONCAT('DROP USER ', GROUP_CONCAT(QUOTE(USER), '@', QUOTE(HOST))), 'SELECT 1') INTO @sql FROM INFORMATION_SCHEMA.USER_ATTRIBUTES WHERE ATTRIBUTE->'$.{attribute_name}'={attribute_value}\")", + ( + "session.run_sql(\"SELECT IFNULL(CONCAT('DROP USER ', GROUP_CONCAT(QUOTE(USER)," + " '@', QUOTE(HOST))), 'SELECT 1') INTO @sql FROM INFORMATION_SCHEMA.USER_ATTRIBUTES" + f" WHERE ATTRIBUTE->'$.{attribute_name}'={attribute_value}\")" + ), 'session.run_sql("PREPARE stmt FROM @sql")', 'session.run_sql("EXECUTE stmt")', 'session.run_sql("DEALLOCATE PREPARE stmt")', @@ -1240,8 +1270,12 @@ def get_mysql_router_users_for_unit( """Get users for related MySQL Router unit.""" relation_user = f"relation-{relation_id}" command = [ - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", - f"result = session.run_sql(\"SELECT USER, ATTRIBUTE->>'$.router_id' FROM INFORMATION_SCHEMA.USER_ATTRIBUTES WHERE ATTRIBUTE->'$.created_by_user'='{relation_user}' AND ATTRIBUTE->'$.created_by_juju_unit'='{mysql_router_unit_name}'\")", + f"shell.connect('{self.instance_def(self.server_config_user)}')", + ( + "result = session.run_sql(\"SELECT USER, ATTRIBUTE->>'$.router_id' FROM " + f"INFORMATION_SCHEMA.USER_ATTRIBUTES WHERE ATTRIBUTE->'$.created_by_user'='{relation_user}' " + f"AND ATTRIBUTE->'$.created_by_juju_unit'='{mysql_router_unit_name}'\")" + ), "print(result.fetch_all())", ] try: @@ -1257,7 +1291,7 @@ def get_mysql_router_users_for_unit( def delete_users_for_unit(self, unit_name: str) -> None: """Delete users for a unit.""" drop_users_command = [ - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", ] drop_users_command.extend( self._get_statements_to_delete_users_with_attribute("unit_name", f"'{unit_name}'") @@ -1271,7 +1305,7 @@ def delete_users_for_unit(self, unit_name: str) -> None: def delete_users_for_relation(self, username: str) -> None: """Delete users for a relation.""" drop_users_command = [ - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", f"session.run_sql(\"DROP USER IF EXISTS '{username}'@'%';\")", ] # If the relation is with a MySQL Router charm application, delete any users @@ -1288,7 +1322,7 @@ def delete_users_for_relation(self, username: str) -> None: def delete_user(self, username: str) -> None: """Delete user.""" drop_user_command = [ - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", f"session.run_sql(\"DROP USER `{username}`@'%'\")", ] try: @@ -1300,7 +1334,7 @@ def delete_user(self, username: str) -> None: def remove_router_from_cluster_metadata(self, router_id: str) -> None: """Remove MySQL Router from InnoDB Cluster metadata.""" command = [ - f"shell.connect_to_primary('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", "cluster = dba.get_cluster()", f'cluster.remove_router_metadata("{router_id}")', ] @@ -1318,16 +1352,13 @@ def set_dynamic_variable( instance_address: Optional[str] = None, ) -> None: """Set a dynamic variable value for the instance.""" - if not instance_address: - instance_address = self.socket_uri - # escape variable values when needed if not re.match(r"^[0-9,a-z,A-Z$_]+$", value): value = f"`{value}`" logger.debug(f"Setting {variable=} to {value=}") set_var_command = [ - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user, instance_address)}')", f"session.run_sql(\"SET {'PERSIST' if persist else 'GLOBAL'} {variable}={value}\")", ] @@ -1340,7 +1371,7 @@ def set_dynamic_variable( def get_variable_value(self, variable: str) -> str: """Get the value of a variable.""" get_var_command = [ - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", f"result = session.run_sql(\"SHOW VARIABLES LIKE '{variable}'\")", "print(result.fetch_all())", ] @@ -1348,7 +1379,7 @@ def get_variable_value(self, variable: str) -> str: try: output = self._run_mysqlsh_script("\n".join(get_var_command)) except MySQLClientError: - logger.exception(f"Failed to get variable {variable}") + logger.exception(f"Failed to get value for {variable=}") raise MySQLGetVariableError rows = json.loads(output) @@ -1367,7 +1398,7 @@ def configure_instance(self, create_cluster_admin: bool = True) -> None: }) configure_instance_command = ( - f"dba.configure_instance('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}', {json.dumps(options)})", + f"dba.configure_instance('{self.instance_def(self.server_config_user)}', {options})", ) try: @@ -1387,8 +1418,8 @@ def create_cluster(self, unit_label: str) -> None: } commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", - f"cluster = dba.create_cluster('{self.cluster_name}', {json.dumps(options)})", + f"shell.connect('{self.instance_def(self.server_config_user)}')", + f"cluster = dba.create_cluster('{self.cluster_name}', {options})", f"cluster.set_instance_option('{self.instance_address}', 'label', '{unit_label}')", ) @@ -1402,7 +1433,7 @@ def create_cluster(self, unit_label: str) -> None: def create_cluster_set(self) -> None: """Create a cluster set for the cluster on cluster primary.""" commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", f"cluster.create_cluster_set('{self.cluster_set_name}')", ) @@ -1434,7 +1465,7 @@ def create_replica_cluster( options["cloneDonor"] = donor commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", "cs = dba.get_cluster_set()", f"repl_cluster = cs.create_replica_cluster('{endpoint}','{replica_cluster_name}', {options})", f"repl_cluster.set_instance_option('{endpoint}', 'label', '{instance_label}')", @@ -1462,7 +1493,7 @@ def create_replica_cluster( def promote_cluster_to_primary(self, cluster_name: str, force: bool = False) -> None: """Promote a cluster to become the primary cluster on the cluster set.""" commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", "cs = dba.get_cluster_set()", ( f"cs.force_primary_cluster('{cluster_name}')" @@ -1481,7 +1512,7 @@ def promote_cluster_to_primary(self, cluster_name: str, force: bool = False) -> def fence_writes(self) -> None: """Fence writes on the primary cluster.""" commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", "c = dba.get_cluster()", "c.fence_writes()", ) @@ -1495,7 +1526,7 @@ def fence_writes(self) -> None: def unfence_writes(self) -> None: """Unfence writes on the primary cluster and reset read_only flag.""" commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", "c = dba.get_cluster()", "c.unfence_writes()", "session.run_sql('SET GLOBAL read_only=OFF')", @@ -1527,7 +1558,7 @@ def is_cluster_in_cluster_set(self, cluster_name: str) -> Optional[bool]: def cluster_metadata_exists(self, from_instance: str) -> bool: """Check if this cluster metadata exists on database.""" check_cluster_metadata_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{from_instance}')", + f"shell.connect('{self.instance_def(self.server_config_user, from_instance)}')", ( 'result = session.run_sql("SELECT cluster_name FROM mysql_innodb_cluster_metadata' f".clusters where cluster_name = '{self.cluster_name}';\")" @@ -1537,7 +1568,7 @@ def cluster_metadata_exists(self, from_instance: str) -> bool: try: output = self._run_mysqlsh_script( - "\n".join(check_cluster_metadata_commands), timeout=10 + "\n".join(check_cluster_metadata_commands), timeout=60 ) except MySQLClientError: logger.warning(f"Failed to check if cluster metadata exists {from_instance=}") @@ -1548,7 +1579,7 @@ def cluster_metadata_exists(self, from_instance: str) -> bool: def rejoin_cluster(self, cluster_name) -> None: """Try to rejoin a cluster to the cluster set.""" commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", "cs = dba.get_cluster_set()", f"cs.rejoin_cluster('{cluster_name}')", ) @@ -1564,7 +1595,7 @@ def rejoin_cluster(self, cluster_name) -> None: def remove_replica_cluster(self, replica_cluster_name: str, force: bool = False) -> None: """Remove a replica cluster from the cluster-set.""" commands = [ - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", "cs = dba.get_cluster_set()", ] if force: @@ -1632,11 +1663,9 @@ def add_instance_to_cluster( ): raise MySQLLockAcquisitionError("Lock not acquired") + connect_instance = from_instance or self.instance_address connect_commands = ( - ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}" - f"@{from_instance or self.instance_address}')" - ), + f"shell.connect('{self.instance_def(self.server_config_user, connect_instance)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", "shell.options['dba.restartWaitTimeout'] = 3600", ) @@ -1680,7 +1709,7 @@ def is_instance_configured_for_innodb( ) -> bool: """Confirm if instance is configured for use in an InnoDB cluster.""" commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user, instance_address)}')", "instance_configured = dba.check_instance_configuration()['status'] == 'ok'", 'print("INSTANCE_CONFIGURED" if instance_configured else "INSTANCE_NOT_CONFIGURED")', ) @@ -1702,10 +1731,7 @@ def is_instance_configured_for_innodb( def are_locks_acquired(self, from_instance: Optional[str] = None) -> bool: """Report if any topology change is being executed.""" commands = ( - ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}" - f"@{from_instance or self.socket_uri}')" - ), + f"shell.connect('{self.instance_def(self.server_config_user, from_instance)}')", "result = session.run_sql(\"SELECT COUNT(*) FROM mysql.juju_units_operations WHERE status='in-progress';\")", "print(f'{result.fetch_one()[0]}')", ) @@ -1734,12 +1760,9 @@ def rescan_cluster( options["addInstances"] = "auto" rescan_cluster_commands = ( - ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@" - f"{from_instance or self.socket_uri}')" - ), + f"shell.connect('{self.instance_def(self.server_config_user, from_instance)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", - f"cluster.rescan({json.dumps(options)})", + f"cluster.rescan({options})", ) try: logger.debug("Rescanning cluster") @@ -1751,7 +1774,7 @@ def rescan_cluster( def is_instance_in_cluster(self, unit_label: str) -> bool: """Confirm if instance is in the cluster.""" commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", f"print(cluster.status()['defaultReplicaSet']['topology'].get('{unit_label}', {{}}).get('status', 'NOT_A_MEMBER'))", ) @@ -1782,7 +1805,7 @@ def get_cluster_status( """Get the cluster status dictionary.""" options = {"extended": extended} status_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{from_instance or self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user, from_instance)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", f"print(cluster.status({options}))", ) @@ -1800,7 +1823,7 @@ def get_cluster_set_status( """Get the cluster-set status dictionary.""" options = {"extended": extended} status_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{from_instance or self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user, from_instance)}')", "cs = dba.get_cluster_set()", f"print(cs.status({options}))", ) @@ -1824,7 +1847,7 @@ def get_replica_cluster_status(self, replica_cluster_name: Optional[str] = None) if not replica_cluster_name: replica_cluster_name = self.cluster_name status_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", "cs = dba.get_cluster_set()", f"print(cs.status(extended=1)['clusters']['{replica_cluster_name}']['globalStatus'])", ) @@ -1850,8 +1873,7 @@ def get_cluster_node_count( f" WHERE member_state = '{node_status.value.upper()}'" ) size_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}" - f"@{from_instance or self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user, from_instance)}')", f'result = session.run_sql("{query}")', 'print(f"{result.fetch_one()[0]}")', ) @@ -1895,6 +1917,8 @@ def _get_host_ip(host: str) -> str: if self.is_cluster_replica(): # replica return global primary address global_primary = self.get_cluster_set_global_primary_address() + if not global_primary: + raise MySQLGetClusterEndpointsError("Failed to get global primary address") rw_endpoints = {_get_host_ip(global_primary) if get_ips else global_primary} else: rw_endpoints = { @@ -1923,7 +1947,7 @@ def execute_remove_instance( "force": "true" if force else "false", } remove_instance_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance or self.instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user, connect_instance)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", "cluster.remove_instance(" f"'{self.cluster_admin_user}@{self.instance_address}', {remove_instance_options})", @@ -2036,7 +2060,7 @@ def dissolve_cluster(self) -> None: """Dissolve the cluster independently of the unit teardown process.""" logger.debug(f"Dissolving cluster {self.cluster_name}") dissolve_cluster_commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", "cluster.dissolve({'force': 'true'})", ) @@ -2049,9 +2073,15 @@ def _acquire_lock(self, primary_address: str, unit_label: str, lock_name: str) - ) acquire_lock_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{primary_address}')", - f"session.run_sql(\"UPDATE mysql.juju_units_operations SET executor='{unit_label}', status='in-progress' WHERE task='{lock_name}' AND executor='';\")", - f"acquired_lock = session.run_sql(\"SELECT count(*) FROM mysql.juju_units_operations WHERE task='{lock_name}' AND executor='{unit_label}';\").fetch_one()[0]", + f"shell.connect('{self.instance_def(self.server_config_user, host=primary_address)}')", + ( + f"session.run_sql(\"UPDATE mysql.juju_units_operations SET executor='{unit_label}'," + f" status='in-progress' WHERE task='{lock_name}' AND executor='';\")" + ), + ( + 'acquired_lock = session.run_sql("SELECT count(*) FROM mysql.juju_units_operations' + f" WHERE task='{lock_name}' AND executor='{unit_label}';\").fetch_one()[0]" + ), "print(f'{acquired_lock}')", ) @@ -2068,23 +2098,32 @@ def _acquire_lock(self, primary_address: str, unit_label: str, lock_name: str) - def _release_lock(self, primary_address: str, unit_label: str, lock_name: str) -> None: """Releases a lock in the mysql.juju_units_operations table.""" - logger.debug(f"Releasing lock {lock_name} on {primary_address} for unit {unit_label}") + logger.debug(f"Releasing {lock_name=} @{primary_address=} for {unit_label=}") release_lock_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{primary_address}')", - "session.run_sql(\"UPDATE mysql.juju_units_operations SET executor='', status='not-started'" + f"shell.connect('{self.instance_def(self.server_config_user, host=primary_address)}')", + "r = session.run_sql(\"UPDATE mysql.juju_units_operations SET executor='', status='not-started'" f" WHERE task='{lock_name}' AND executor='{unit_label}';\")", + "print(r.get_affected_items_count())", ) - self._run_mysqlsh_script("\n".join(release_lock_commands)) + affected_rows = self._run_mysqlsh_script("\n".join(release_lock_commands)) + if affected_rows: + if int(affected_rows) == 0: + logger.warning("No lock to release") + else: + logger.debug(f"{lock_name=} released for {unit_label=}") def _get_cluster_member_addresses(self, exclude_unit_labels: List = []) -> Tuple[List, bool]: """Get the addresses of the cluster's members.""" logger.debug(f"Getting cluster member addresses, excluding units {exclude_unit_labels}") get_cluster_members_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", - f"member_addresses = ','.join([member['address'] for label, member in cluster.status()['defaultReplicaSet']['topology'].items() if label not in {exclude_unit_labels}])", + ( + "member_addresses = ','.join([member['address'] for label, member in " + f"cluster.status()['defaultReplicaSet']['topology'].items() if label not in {exclude_unit_labels}])" + ), "print(f'{member_addresses}')", ) @@ -2105,12 +2144,10 @@ def get_cluster_primary_address( self, connect_instance_address: Optional[str] = None ) -> Optional[str]: """Get the cluster primary's address.""" - if not connect_instance_address: - connect_instance_address = self.instance_address - logger.debug(f"Getting cluster primary member's address from {connect_instance_address}") + logger.debug("Getting cluster primary member's address") get_cluster_primary_commands = ( - f"shell.connect_to_primary('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user, host=connect_instance_address)}')", "primary_address = shell.parse_uri(session.uri)['host']", "print(f'{primary_address}')", ) @@ -2131,14 +2168,10 @@ def get_cluster_set_global_primary_address( self, connect_instance_address: Optional[str] = None ) -> Optional[str]: """Get the cluster set global primary's address.""" - if not connect_instance_address: - connect_instance_address = self.instance_address - logger.debug( - f"Getting cluster set global primary member's address from {connect_instance_address}" - ) + logger.debug("Getting cluster set global primary member's address") get_cluster_set_global_primary_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user, host=connect_instance_address)}')", "cs = dba.get_cluster_set()", "global_primary = cs.status()['globalPrimaryInstance']", "print(f'{global_primary}')", @@ -2154,7 +2187,12 @@ def get_cluster_set_global_primary_address( if not matches: return None - return matches.group(1) + address = matches.group(1) + if ":" in address: + # strip port from address + address = address.split(":")[0] + + return address def get_primary_label(self) -> Optional[str]: """Get the label of the cluster's primary.""" @@ -2175,7 +2213,7 @@ def set_cluster_primary(self, new_primary_address: str) -> None: logger.debug(f"Setting cluster primary to {new_primary_address}") set_cluster_primary_commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", f"cluster.set_primary_instance('{new_primary_address}')", ) @@ -2188,7 +2226,7 @@ def set_cluster_primary(self, new_primary_address: str) -> None: def get_cluster_members_addresses(self) -> Optional[Iterable[str]]: """Get the addresses of the cluster's members.""" get_cluster_members_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", "members = ','.join((member['address'] for member in cluster.describe()['defaultReplicaSet']['topology']))", "print(f'{members}')", @@ -2209,9 +2247,9 @@ def get_cluster_members_addresses(self) -> Optional[Iterable[str]]: def verify_server_upgradable(self, instance: Optional[str] = None) -> None: """Wrapper for API check_for_server_upgrade.""" + # use cluster admin user to enforce standard port usage check_command = [ - f"shell.connect('{self.server_config_user}" - f":{self.server_config_password}@{instance or self.socket_uri}')", + f"shell.connect('{self.instance_def(self.cluster_admin_user, host=instance)}')", "try:", " util.check_for_server_upgrade(options={'outputFormat': 'JSON'})", "except ValueError:", # ValueError is raised for same version check @@ -2244,7 +2282,7 @@ def get_mysql_version(self) -> Optional[str]: logger.debug("Getting InnoDB version") get_version_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", 'result = session.run_sql("SELECT version()")', 'print(f"{result.fetch_one()[0]}")', ) @@ -2267,8 +2305,11 @@ def grant_privileges_to_user( ) -> None: """Grants specified privileges to the provided database user.""" grant_privileges_commands = ( - f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", - f"session.run_sql(\"GRANT {', '.join(privileges)} ON *.* TO '{username}'@'{hostname}'{' WITH GRANT OPTION' if with_grant_option else ''}\")", + f"shell.connect_to_primary('{self.instance_def(self.server_config_user)}')", + ( + f"session.run_sql(\"GRANT {', '.join(privileges)} ON *.* TO '{username}'@'{hostname}'" + f"{' WITH GRANT OPTION' if with_grant_option else ''}\")" + ), ) try: @@ -2279,26 +2320,22 @@ def grant_privileges_to_user( def update_user_password(self, username: str, new_password: str, host: str = "%") -> None: """Updates user password in MySQL database.""" - logger.debug(f"Updating password for {username}.") - # password is set on the global primary if not (instance_address := self.get_cluster_set_global_primary_address()): raise MySQLCheckUserExistenceError("No primary found") update_user_password_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user, host=instance_address)}')", f"session.run_sql(\"ALTER USER '{username}'@'{host}' IDENTIFIED BY '{new_password}';\")", 'session.run_sql("FLUSH PRIVILEGES;")', ) + logger.debug(f"Updating password for {username}.") try: self._run_mysqlsh_script("\n".join(update_user_password_commands)) - except MySQLClientError as e: - logger.exception( - f"Failed to update user password for user {username}", - exc_info=e, - ) - raise MySQLCheckUserExistenceError(e.message) + except MySQLClientError: + logger.exception(f"Failed to update user password for user {username}") + raise MySQLCheckUserExistenceError @retry(reraise=True, stop=stop_after_attempt(3), wait=wait_fixed(GET_MEMBER_STATE_TIME)) def get_member_state(self) -> Tuple[str, str]: @@ -2370,7 +2407,7 @@ def get_cluster_set_name(self, from_instance: Optional[str] = None) -> Optional[ def stop_group_replication(self) -> None: """Stop Group replication if enabled on the instance.""" stop_gr_command = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", "data = session.run_sql('SELECT 1 FROM performance_schema.replication_group_members')", "if len(data.fetch_all()) > 0:", " session.run_sql('STOP GROUP_REPLICATION')", @@ -2383,7 +2420,7 @@ def stop_group_replication(self) -> None: def reboot_from_complete_outage(self) -> None: """Wrapper for reboot_cluster_from_complete_outage command.""" reboot_from_outage_command = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", f"dba.reboot_cluster_from_complete_outage('{self.cluster_name}')", ) @@ -2414,8 +2451,8 @@ def set_instance_offline_mode(self, offline_mode: bool = False) -> None: try: self._run_mysqlcli_script( "; ".join(set_instance_offline_mode_commands), - user=self.cluster_admin_user, - password=self.cluster_admin_password, + user=self.server_config_user, + password=self.server_config_password, ) except MySQLClientError as e: logger.exception(f"Failed to set instance state to offline_mode {mode}") @@ -2424,7 +2461,7 @@ def set_instance_offline_mode(self, offline_mode: bool = False) -> None: def set_instance_option(self, option: str, value: Any) -> None: """Sets an instance option.""" set_instance_option_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", f"cluster = dba.get_cluster('{self.cluster_name}')", f"cluster.set_instance_option('{self.instance_address}', '{option}', '{value}')", ) @@ -2439,7 +2476,7 @@ def offline_mode_and_hidden_instance_exists(self) -> bool: """Indicates whether an instance exists in offline_mode and hidden from router.""" offline_mode_message = "Instance has offline_mode enabled" commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", f"cluster_topology = dba.get_cluster('{self.cluster_name}').status()['defaultReplicaSet']['topology']", f"selected_instances = [label for label, member in cluster_topology.items() if '{offline_mode_message}' in member.get('instanceErrors', '') and member.get('hiddenFromRouter')]", "print(f'{len(selected_instances)}')", @@ -2628,9 +2665,9 @@ def delete_temp_backup_directory( except MySQLExecError as e: logger.exception("Failed to delete temp backup directory") raise MySQLDeleteTempBackupDirectoryError(e.message) - except Exception as e: + except Exception: logger.exception("Failed to delete temp backup directory") - raise MySQLDeleteTempBackupDirectoryError(e) + raise MySQLDeleteTempBackupDirectoryError def retrieve_backup_with_xbcloud( self, @@ -2886,7 +2923,7 @@ def tls_setup( def kill_unencrypted_sessions(self) -> None: """Kill non local, non system open unencrypted connections.""" kill_connections_command = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", ( 'processes = session.run_sql("' "SELECT processlist_id FROM performance_schema.threads WHERE " @@ -2906,7 +2943,7 @@ def kill_unencrypted_sessions(self) -> None: def kill_client_sessions(self) -> None: """Kill non local, non system open unencrypted connections.""" kill_connections_command = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", ( 'processes = session.run_sql("' "SELECT processlist_id FROM performance_schema.threads WHERE " @@ -2926,7 +2963,7 @@ def kill_client_sessions(self) -> None: def check_mysqlsh_connection(self) -> bool: """Checks if it is possible to connect to the server with mysqlsh.""" connect_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", 'session.run_sql("SELECT 1")', ) @@ -2950,7 +2987,7 @@ def get_pid_of_port_3306(self) -> Optional[str]: def flush_mysql_logs(self, logs_type: Union[MySQLTextLogs, list[MySQLTextLogs]]) -> None: """Flushes the specified logs_type logs.""" flush_logs_commands = [ - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", 'session.run_sql("SET sql_log_bin = 0")', ] @@ -2975,7 +3012,7 @@ def flush_mysql_logs(self, logs_type: Union[MySQLTextLogs, list[MySQLTextLogs]]) def get_databases(self) -> set[str]: """Return a set with all databases on the server.""" list_databases_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.socket_uri}')", + f"shell.connect('{self.instance_def(self.server_config_user)}')", 'result = session.run_sql("SHOW DATABASES")', "for db in result.fetch_all():\n print(db[0])", ) diff --git a/src/config.py b/src/config.py index a075bff4d..40994e5fa 100644 --- a/src/config.py +++ b/src/config.py @@ -132,7 +132,8 @@ def experimental_max_connections_validator(cls, value: int) -> Optional[int]: """Check experimental max connections.""" if value < MAX_CONNECTIONS_FLOOR: raise ValueError( - f"experimental-max-connections must be greater than {MAX_CONNECTIONS_FLOOR}" + f"experimental-max-connections ({value=}) must be equal or greater " + + f" than {MAX_CONNECTIONS_FLOOR}" ) return value diff --git a/tests/integration/connector.py b/tests/integration/connector.py index 733694eaa..a71c06359 100644 --- a/tests/integration/connector.py +++ b/tests/integration/connector.py @@ -37,3 +37,29 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.connection.commit() self.cursor.close() self.connection.close() + + +def create_db_connections( + num_connections: int, host: str, username: str, password: str, database: str +) -> list[mysql.connector.MySQLConnection]: + """Create a list of database connections. + + Args: + num_connections: Number of connections to create. + host: Hostname of the database. + username: Username to connect to the database. + password: Password to connect to the database. + database: Database to connect to. + """ + connections = [] + for _ in range(num_connections): + conn = mysql.connector.connect( + host=host, + user=username, + password=password, + database=database, + use_pure=True, + ) + if conn.is_connected(): + connections.append(conn) + return connections diff --git a/tests/integration/test_saturate_max_connections.py b/tests/integration/test_saturate_max_connections.py new file mode 100644 index 000000000..700af07dd --- /dev/null +++ b/tests/integration/test_saturate_max_connections.py @@ -0,0 +1,101 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +from pathlib import Path + +import pytest +import yaml +from mysql.connector.errors import OperationalError +from pytest_operator.plugin import OpsTest + +from .connector import create_db_connections +from .helpers import get_unit_address +from .juju_ import run_action + +logger = logging.getLogger(__name__) + +MYSQL_APP_NAME = "mysql" +TEST_APP_NAME = "app" +CONNECTIONS = 10 +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest) -> None: + """Build the charm and deploy 1 units to ensure a cluster is formed.""" + charm = await ops_test.build_charm(".") + config = {"profile-limit-memory": "2000", "experimental-max-connections": CONNECTIONS} + resources = {"mysql-image": METADATA["resources"]["mysql-image"]["upstream-source"]} + + await ops_test.model.deploy( + charm, + application_name=MYSQL_APP_NAME, + config=config, + num_units=1, + base="ubuntu@22.04", + resources=resources, + trust=True, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_deploy_and_relate_test_app(ops_test: OpsTest) -> None: + config = {"auto_start_writes": False, "sleep_interval": "500"} + logger.info("Deploying test app") + await ops_test.model.deploy( + "mysql-test-app", + application_name=TEST_APP_NAME, + num_units=1, + base="ubuntu@22.04", + config=config, + channel="latest/edge", + ) + + logger.info("Relating test app to mysql") + await ops_test.model.relate(MYSQL_APP_NAME, f"{TEST_APP_NAME}:database") + + logger.info("Waiting all to be active") + await ops_test.model.block_until( + lambda: all(unit.workload_status == "active" for unit in ops_test.model.units.values()), + timeout=60 * 10, + wait_period=5, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_saturate_max_connections(ops_test: OpsTest) -> None: + app_unit = ops_test.model.applications[TEST_APP_NAME].units[0] + mysql_unit = ops_test.model.applications[MYSQL_APP_NAME].units[0] + + host_ip = await get_unit_address(ops_test, mysql_unit.name) + logger.info("Running action to get app connection data") + credentials = await run_action(app_unit, "get-client-connection-data") + if "return-code" in credentials: + # juju 2.9 dont have the return-code key + del credentials["return-code"] + if "Code" in credentials: + del credentials["Code"] + credentials["host"] = host_ip + + logger.info(f"Creating {CONNECTIONS} connections") + connections = create_db_connections(CONNECTIONS, **credentials) + assert isinstance(connections, list), "Connections not created" + + logger.info("Ensure all connections are established") + for conn in connections: + assert conn.is_connected(), "Connection failed to establish" + + assert len(connections) == CONNECTIONS, "Not all connections were established" + + logger.info("Ensure no more client connections are possible") + + with pytest.raises(OperationalError): + # exception raised when too many connections are attempted + create_db_connections(1, **credentials) + + logger.info("Get cluster status while connections are saturated") + _ = await run_action(mysql_unit, "get-cluster-status")