diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 6a021b8368aea..a0a95124d5c24 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -675,7 +675,201 @@ def __init__(self): super(SchemaRegistryConfig, self).__init__() -class RedpandaService(Service): +class RedpandaServiceBase(Service): + def __init__( + self, + context, + num_brokers, + extra_rp_conf=None, + resource_settings=None, + si_settings=None, + ): + super(RedpandaServiceBase, self).__init__(context, + num_nodes=num_brokers) + self._context = context + self._extra_rp_conf = extra_rp_conf or dict() + + if si_settings is not None: + self.set_si_settings(si_settings) + else: + self._si_settings = None + + if resource_settings is None: + resource_settings = ResourceSettings() + self._resource_settings = resource_settings + + def start_node(self, node, **kwargs): + pass + + def stop_node(self, node, **kwargs): + pass + + def clean_node(self, node, **kwargs): + pass + + def restart_nodes(self, + nodes, + override_cfg_params=None, + start_timeout=None, + stop_timeout=None, + auto_assign_node_id=False, + omit_seeds_on_idx_one=True): + + nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes + with concurrent.futures.ThreadPoolExecutor( + max_workers=len(nodes)) as executor: + # The list() wrapper is to cause futures to be evaluated here+now + # (including throwing any exceptions) and not just spawned in background. + list( + executor.map(lambda n: self.stop_node(n, timeout=stop_timeout), + nodes)) + list( + executor.map( + lambda n: self.start_node( + n, + override_cfg_params, + timeout=start_timeout, + auto_assign_node_id=auto_assign_node_id, + omit_seeds_on_idx_one=omit_seeds_on_idx_one), nodes)) + + def set_extra_rp_conf(self, conf): + self._extra_rp_conf = conf + if self._si_settings is not None: + self._extra_rp_conf = self._si_settings.update_rp_conf( + self._extra_rp_conf) + + def set_si_settings(self, si_settings: SISettings): + si_settings.load_context(self.logger, self._context) + self._si_settings = si_settings + self._extra_rp_conf = self._si_settings.update_rp_conf( + self._extra_rp_conf) + + def add_extra_rp_conf(self, conf): + self._extra_rp_conf = {**self._extra_rp_conf, **conf} + + def get_node_memory_mb(self): + pass + + def get_node_cpu_count(self): + pass + + def get_node_disk_free(self): + pass + + def lsof_node(self, node: ClusterNode, filter: Optional[str] = None): + pass + + def metrics(self, + node, + metrics_endpoint: MetricsEndpoint = MetricsEndpoint.METRICS): + assert node in self._started, f"where node is {node.name}" + + metrics_endpoint = ("/metrics" if metrics_endpoint + == MetricsEndpoint.METRICS else "/public_metrics") + url = f"http://{node.account.hostname}:9644{metrics_endpoint}" + resp = requests.get(url) + assert resp.status_code == 200 + return text_string_to_metric_families(resp.text) + + def metric_sum(self, + metric_name, + metrics_endpoint: MetricsEndpoint = MetricsEndpoint.METRICS, + ns=None, + topic=None): + """ + Pings the 'metrics_endpoint' of each node and returns the summed values + of the given metric, optionally filtering by namespace and topic. + """ + count = 0 + for n in self.nodes: + metrics = self.metrics(n, metrics_endpoint=metrics_endpoint) + for family in metrics: + for sample in family.samples: + if ns and sample.labels["namespace"] != ns: + continue + if topic and sample.labels["topic"] != topic: + continue + if sample.name == metric_name: + count += int(sample.value) + return count + + def healthy(self): + """ + A primitive health check on all the nodes which returns True when all + nodes report that no under replicated partitions exist. This should + later be replaced by a proper / official start-up probe type check on + the health of a node after a restart. + """ + counts = {self.idx(node): None for node in self.nodes} + for node in self.nodes: + try: + metrics = self.metrics(node) + except: + return False + idx = self.idx(node) + for family in metrics: + for sample in family.samples: + if sample.name == "vectorized_cluster_partition_under_replicated_replicas": + if counts[idx] is None: + counts[idx] = 0 + counts[idx] += int(sample.value) + return all(map(lambda count: count == 0, counts.values())) + + def node_id(self, node, force_refresh=False, timeout_sec=30): + pass + + def partitions(self, topic_name=None): + """ + Return partition metadata for the topic. + """ + kc = KafkaCat(self) + md = kc.metadata() + + result = [] + + def make_partition(topic_name, p): + index = p["partition"] + leader_id = p["leader"] + leader = None if leader_id == -1 else self.get_node(leader_id) + replicas = [self.get_node(r["id"]) for r in p["replicas"]] + return Partition(topic_name, index, leader, replicas) + + for topic in md["topics"]: + if topic["topic"] == topic_name or topic_name is None: + result.extend( + make_partition(topic["topic"], p) + for p in topic["partitions"]) + + return result + + def rolling_restart_nodes(self, + nodes, + override_cfg_params=None, + start_timeout=None, + stop_timeout=None, + use_maintenance_mode=True, + omit_seeds_on_idx_one=True): + nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes + restarter = RollingRestarter(self) + restarter.restart_nodes(nodes, + override_cfg_params=override_cfg_params, + start_timeout=start_timeout, + stop_timeout=stop_timeout, + use_maintenance_mode=use_maintenance_mode, + omit_seeds_on_idx_one=omit_seeds_on_idx_one) + + def set_cluster_config(self, + values: dict, + expect_restart: bool = False, + admin_client: Optional[Admin] = None, + timeout: int = 10): + pass + + def set_resource_settings(self, rs): + self._resource_settings = rs + + +class RedpandaService(RedpandaServiceBase): PERSISTENT_ROOT = "/var/lib/redpanda" DATA_DIR = os.path.join(PERSISTENT_ROOT, "data") NODE_CONFIG_FILE = "/etc/redpanda/redpanda.yaml" @@ -769,9 +963,9 @@ def __init__(self, pandaproxy_config: Optional[PandaproxyConfig] = None, schema_registry_config: Optional[SchemaRegistryConfig] = None, disable_cloud_storage_diagnostics=False): - super(RedpandaService, self).__init__(context, num_nodes=num_brokers) - self._context = context - self._extra_rp_conf = extra_rp_conf or dict() + super(RedpandaService, + self).__init__(context, num_brokers, extra_rp_conf, + resource_settings, si_settings) self._security = security self._installer: RedpandaInstaller = RedpandaInstaller(self) self._pandaproxy_config = pandaproxy_config @@ -823,17 +1017,9 @@ def __init__(self, self._trim_logs = self._context.globals.get(self.TRIM_LOGS_KEY, True) - if resource_settings is None: - resource_settings = ResourceSettings() - self._resource_settings = resource_settings self.logger.info( f"ResourceSettings: dedicated_nodes={self._dedicated_nodes}") - if si_settings is not None: - self.set_si_settings(si_settings) - else: - self._si_settings = None - # Disable saving cloud storage diagnostics. This may be useful for # tests that generate millions of objecst, as collecting diagnostics # may take a significant amount of time. @@ -875,28 +1061,10 @@ def set_skip_if_no_redpanda_log(self, v: bool): def set_environment(self, environment: dict[str, str]): self._environment.update(environment) - def set_resource_settings(self, rs): - self._resource_settings = rs - - def set_extra_rp_conf(self, conf): - self._extra_rp_conf = conf - if self._si_settings is not None: - self._extra_rp_conf = self._si_settings.update_rp_conf( - self._extra_rp_conf) - - def set_si_settings(self, si_settings: SISettings): - si_settings.load_context(self.logger, self._context) - self._si_settings = si_settings - self._extra_rp_conf = self._si_settings.update_rp_conf( - self._extra_rp_conf) - @property def si_settings(self): return self._si_settings - def add_extra_rp_conf(self, conf): - self._extra_rp_conf = {**self._extra_rp_conf, **conf} - def set_extra_node_conf(self, node, conf): assert node in self.nodes, f"where node is {node.name}" self._extra_node_conf[node] = conf @@ -2252,47 +2420,6 @@ def write_bootstrap_cluster_config(self): node.account.create_file( RedpandaService.CLUSTER_BOOTSTRAP_CONFIG_FILE, conf_yaml) - def restart_nodes(self, - nodes, - override_cfg_params=None, - start_timeout=None, - stop_timeout=None, - auto_assign_node_id=False, - omit_seeds_on_idx_one=True): - - nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes - with concurrent.futures.ThreadPoolExecutor( - max_workers=len(nodes)) as executor: - # The list() wrapper is to cause futures to be evaluated here+now - # (including throwing any exceptions) and not just spawned in background. - list( - executor.map(lambda n: self.stop_node(n, timeout=stop_timeout), - nodes)) - list( - executor.map( - lambda n: self.start_node( - n, - override_cfg_params, - timeout=start_timeout, - auto_assign_node_id=auto_assign_node_id, - omit_seeds_on_idx_one=omit_seeds_on_idx_one), nodes)) - - def rolling_restart_nodes(self, - nodes, - override_cfg_params=None, - start_timeout=None, - stop_timeout=None, - use_maintenance_mode=True, - omit_seeds_on_idx_one=True): - nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes - restarter = RollingRestarter(self) - restarter.restart_nodes(nodes, - override_cfg_params=override_cfg_params, - start_timeout=start_timeout, - stop_timeout=stop_timeout, - use_maintenance_mode=use_maintenance_mode, - omit_seeds_on_idx_one=omit_seeds_on_idx_one) - def get_node_by_id(self, node_id): """ Returns a node that has requested id or None if node is not found @@ -2558,40 +2685,6 @@ def schema_reg(self, limit=None) -> str: ] return ",".join(schema_reg) - def metrics(self, - node, - metrics_endpoint: MetricsEndpoint = MetricsEndpoint.METRICS): - assert node in self._started, f"where node is {node.name}" - - metrics_endpoint = ("/metrics" if metrics_endpoint - == MetricsEndpoint.METRICS else "/public_metrics") - url = f"http://{node.account.hostname}:9644{metrics_endpoint}" - resp = requests.get(url) - assert resp.status_code == 200 - return text_string_to_metric_families(resp.text) - - def metric_sum(self, - metric_name, - metrics_endpoint: MetricsEndpoint = MetricsEndpoint.METRICS, - ns=None, - topic=None): - """ - Pings the 'metrics_endpoint' of each node and returns the summed values - of the given metric, optionally filtering by namespace and topic. - """ - count = 0 - for n in self.nodes: - metrics = self.metrics(n, metrics_endpoint=metrics_endpoint) - for family in metrics: - for sample in family.samples: - if ns and sample.labels["namespace"] != ns: - continue - if topic and sample.labels["topic"] != topic: - continue - if sample.name == metric_name: - count += int(sample.value) - return count - def _extract_samples(self, metrics, sample_pattern: str, node: ClusterNode) -> list[MetricSamples]: found_sample = None @@ -2724,52 +2817,6 @@ def _try_get_node_id(): self._node_id_by_idx[idx] = node_id return node_id - def healthy(self): - """ - A primitive health check on all the nodes which returns True when all - nodes report that no under replicated partitions exist. This should - later be replaced by a proper / official start-up probe type check on - the health of a node after a restart. - """ - counts = {self.idx(node): None for node in self.nodes} - for node in self.nodes: - try: - metrics = self.metrics(node) - except: - return False - idx = self.idx(node) - for family in metrics: - for sample in family.samples: - if sample.name == "vectorized_cluster_partition_under_replicated_replicas": - if counts[idx] is None: - counts[idx] = 0 - counts[idx] += int(sample.value) - return all(map(lambda count: count == 0, counts.values())) - - def partitions(self, topic_name=None): - """ - Return partition metadata for the topic. - """ - kc = KafkaCat(self) - md = kc.metadata() - - result = [] - - def make_partition(topic_name, p): - index = p["partition"] - leader_id = p["leader"] - leader = None if leader_id == -1 else self.get_node(leader_id) - replicas = [self.get_node(r["id"]) for r in p["replicas"]] - return Partition(topic_name, index, leader, replicas) - - for topic in md["topics"]: - if topic["topic"] == topic_name or topic_name is None: - result.extend( - make_partition(topic["topic"], p) - for p in topic["partitions"]) - - return result - def cov_enabled(self): cov_option = self._context.globals.get(self.COV_KEY, self.DEFAULT_COV_OPT)