Skip to content

Commit

Permalink
fix: associate correct collection name to operation
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulFarault committed Feb 20, 2024
1 parent e296ac9 commit d52fe14
Showing 1 changed file with 63 additions and 33 deletions.
96 changes: 63 additions & 33 deletions tdp/core/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,76 +143,106 @@ def _init_operations(
dag_operations: dict[str, Operation] = {}
other_operations: dict[str, Operation] = {}

# Init DAG Operations
for collection in collections.values():
# Load DAG operations from the dag files
for dag_file in collection.dag_yamls:
for read_operation in read_tdp_lib_dag_file(dag_file):
if read_operation.name in dag_operations:
# Merge the operation with the existing one
logger.debug(
f"DAG Operation '{read_operation.name}' defined in collection "
f"'{dag_operations[read_operation.name].collection_name}' "
f"is merged with collection '{collection.name}'"
existing_operation = dag_operations.get(read_operation.name)

# The read_operation is associated with a playbook defined in the
# current collection
if _ := collection.playbooks.get(read_operation.name):
# TODO: would be nice to dissociate the Operation class from the playbook and store the playbook in the Operation
dag_operation_to_register = Operation(
name=read_operation.name,
collection_name=collection.name,
host_names=collection.get_hosts_from_playbook(
read_operation.name
),
depends_on=read_operation.depends_on.copy(),
)
dag_operations[read_operation.name].depends_on.extend(
read_operation.depends_on
# If the operation is already registered, merge its dependencies
if existing_operation:
dag_operation_to_register.depends_on.extend(
dag_operations[read_operation.name].depends_on
)
# Print a warning if we override a playbook operation
if not existing_operation.noop:
logger.debug(
f"'{read_operation.name}' defined in "
f"'{existing_operation.collection_name}' "
f"is overridden by '{collection.name}'"
)
# Register the operation
dag_operations[read_operation.name] = dag_operation_to_register
continue

# The read_operation is already registered
if existing_operation:
logger.debug(
f"'{read_operation.name}' defined in "
f"'{existing_operation.collection_name}' "
f"is extended by '{collection.name}'"
)
existing_operation.depends_on.extend(read_operation.depends_on)
continue

# Create the operation
# From this point, the read_operation is a noop as it is not defined
# in the current nor the previous collections

# Create and register the operation
dag_operations[read_operation.name] = Operation(
name=read_operation.name,
collection_name=collection.name, # TODO: this is the collection that defines the DAG where the operation is defined, not the collection that defines the operation
depends_on=read_operation.depends_on,
noop=read_operation.noop,
host_names=(
None
if read_operation.noop
else collection.get_hosts_from_playbook(read_operation.name)
),
collection_name=collection.name,
depends_on=read_operation.depends_on.copy(),
noop=True,
host_names=None,
)
# 'restart' and 'stop' operations are not defined in the DAG for
# noop, they need to be generated from the start operations.
if read_operation.noop and read_operation.name.endswith("_start"):
# 'restart' and 'stop' operations are not defined in the DAG file
# for noop, they need to be generated from the start operations
if read_operation.name.endswith("_start"):
logger.debug(
f"DAG Operation '{read_operation.name}' is noop, "
f"creating the associated restart and stop operations."
f"'{read_operation.name}' is noop, creating the associated "
"restart and stop operations"
)
# Create and store the restart operation
# Create and register the restart operation
restart_operation_name = read_operation.name.replace(
"_start", "_restart"
)
other_operations[restart_operation_name] = Operation(
name=restart_operation_name,
collection_name="replace_restart_noop",
depends_on=read_operation.depends_on,
depends_on=read_operation.depends_on.copy(),
noop=True,
host_names=None,
)
# Create and store the stop operation
# Create and register the stop operation
stop_operation_name = read_operation.name.replace(
"_start", "_stop"
)
other_operations[stop_operation_name] = Operation(
name=stop_operation_name,
collection_name="replace_stop_noop",
depends_on=read_operation.depends_on,
depends_on=read_operation.depends_on.copy(),
noop=True,
host_names=None,
)

# Init Operations not in the DAG
# We can't merge the two for loops to handle the case where a playbook operation
# is defined in a first collection but not used in the DAG and then used in
# the DAG in a second collection.
for collection in collections.values():
for operation_name, _ in collection.playbooks.items():
# Load playbook operations to complete the operations list with the
# operations that are not defined in the DAG files
for operation_name in collection.playbooks:
if operation_name in dag_operations:
continue
if operation_name in other_operations:
logger.info(
f"Operation '{operation_name}' defined in collection "
logger.debug(
f"'{operation_name}' defined in "
f"'{other_operations[operation_name].collection_name}' "
f"is overridden by collection '{collection.name}'"
f"is overridden by '{collection.name}'"
)

other_operations[operation_name] = Operation(
name=operation_name,
host_names=collection.get_hosts_from_playbook(operation_name),
Expand Down

0 comments on commit d52fe14

Please sign in to comment.