From 4f2ad367640fa1c83dcc54fa5a30d73a0d96fc32 Mon Sep 17 00:00:00 2001 From: SteBaum Date: Thu, 8 Aug 2024 10:58:10 +0200 Subject: [PATCH] fix: get rid of red error marking in Dag class --- tdp/core/dag.py | 50 +++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/tdp/core/dag.py b/tdp/core/dag.py index 8684bad3..52d9826d 100644 --- a/tdp/core/dag.py +++ b/tdp/core/dag.py @@ -20,6 +20,7 @@ import networkx as nx from tdp.core.constants import DEFAULT_SERVICE_PRIORITY, SERVICE_PRIORITY +from tdp.core.entities.operation import Operations from tdp.core.operation import Operation if TYPE_CHECKING: @@ -50,7 +51,7 @@ def __init__(self, collections: Collections): self._graph = self._generate_graph(self.operations) @property - def operations(self) -> dict[str, Operation]: + def operations(self) -> Operations: """DAG operations dictionary.""" return self._operations @@ -239,7 +240,7 @@ def get_operation_descendants( ) # TODO: can take a list of operations instead of a dict - def _generate_graph(self, nodes: dict[str, Operation]) -> nx.DiGraph: + def _generate_graph(self, nodes: Operations) -> nx.DiGraph: DG = nx.DiGraph() for operation_name, operation in nodes.items(): DG.add_node(operation_name) @@ -258,7 +259,7 @@ def _generate_graph(self, nodes: dict[str, Operation]) -> nx.DiGraph: # TODO: call this method inside of Collections._init_operations instead of the Dag constructor # TODO: remove Collections dependency -def validate_dag_nodes(nodes: dict[str, Operation], collections: Collections) -> None: +def validate_dag_nodes(nodes: Operations, collections: Collections) -> None: r"""Validation rules : - \*_start operations can only be required from within its own service - \*_install operations should only depend on other \*_install operations @@ -274,28 +275,29 @@ def warning(collection_name: str, message: str) -> None: logger.warning(message + f", collection: {collection_name}") for operation_name, operation in nodes.items(): - c_warning = functools.partial(warning, operation.collection_name) - for dependency in operation.depends_on: - # *_start operations can only be required from within its own service - dependency_service = nodes[dependency].service_name - if ( - dependency.endswith("_start") - and dependency_service != operation.service_name - ): - c_warning( - f"Operation '{operation_name}' is in service '{operation.service_name}', depends on " - f"'{dependency}' which is a start action in service '{dependency_service}' and should " - f"only depends on start action within its own service" - ) + if operation.collection_name is not None: + c_warning = functools.partial(warning, operation.collection_name) + for dependency in operation.depends_on: + # *_start operations can only be required from within its own service + dependency_service = nodes[dependency].service_name + if ( + dependency.endswith("_start") + and dependency_service != operation.service_name + ): + c_warning( + f"Operation '{operation_name}' is in service '{operation.service_name}', depends on " + f"'{dependency}' which is a start action in service '{dependency_service}' and should " + f"only depends on start action within its own service" + ) - # *_install operations should only depend on other *_install operations - if operation_name.endswith("_install") and not dependency.endswith( - "_install" - ): - c_warning( - f"Operation '{operation_name}' is an install action, depends on '{dependency}' which is " - f"not an install action and should only depends on other install action" - ) + # *_install operations should only depend on other *_install operations + if operation_name.endswith("_install") and not dependency.endswith( + "_install" + ): + c_warning( + f"Operation '{operation_name}' is an install action, depends on '{dependency}' which is " + f"not an install action and should only depends on other install action" + ) # Each service (HDFS, HBase, Hive, etc) should have *_install, *_config, *_init and *_start actions # even if they are "empty" (tagged with noop)