-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: parse_dag_directory take a path
- Loading branch information
1 parent
07b23ac
commit f57c27f
Showing
7 changed files
with
179 additions
and
149 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .collection import ( | ||
Collection, | ||
MissingMandatoryDirectoryError, | ||
PathDoesNotExistsError, | ||
PathIsNotADirectoryError, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
from collections.abc import Generator | ||
from pathlib import Path | ||
|
||
import yaml | ||
from pydantic import BaseModel, ConfigDict, ValidationError | ||
|
||
from tdp.core.constants import ( | ||
YML_EXTENSION, | ||
) | ||
|
||
try: | ||
from yaml import CLoader as Loader | ||
except ImportError: | ||
from yaml import Loader | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def read_dag_directory( | ||
directory_path: Path, | ||
) -> Generator[TDPLibDagNodeModel, None, None]: | ||
"""Get the DAG nodes of a collection. | ||
Args: | ||
directory_path: Path to the DAG directory. | ||
Returns: | ||
List of DAG nodes. | ||
""" | ||
for dag_file in (directory_path).glob("*" + YML_EXTENSION): | ||
yield from read_dag_file(dag_file) | ||
|
||
|
||
class TDPLibDagNodeModel(BaseModel): | ||
"""Model for a TDP operation defined in a tdp_lib_dag file.""" | ||
|
||
model_config = ConfigDict(extra="ignore") | ||
|
||
name: str | ||
depends_on: list[str] = [] | ||
|
||
|
||
class TDPLibDagModel(BaseModel): | ||
"""Model for a TDP DAG defined in a tdp_lib_dag file.""" | ||
|
||
model_config = ConfigDict(extra="ignore") | ||
|
||
operations: list[TDPLibDagNodeModel] | ||
|
||
|
||
def read_dag_file( | ||
file_path: Path, | ||
) -> Generator[TDPLibDagNodeModel, None, None]: | ||
"""Read a tdp_lib_dag file and return a list of DAG operations. | ||
Args: | ||
file_path: Path to the tdp_lib_dag file. | ||
""" | ||
with file_path.open("r") as operations_file: | ||
file_content = yaml.load(operations_file, Loader=Loader) | ||
|
||
try: | ||
tdp_lib_dag = TDPLibDagModel(operations=file_content) | ||
for operation in tdp_lib_dag.operations: | ||
yield operation | ||
except ValidationError as e: | ||
logger.error(f"Error while parsing tdp_lib_dag file {file_path}: {e}") | ||
raise |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
# Copyright 2022 TOSIT.IO | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from pathlib import Path | ||
|
||
import pytest | ||
from pydantic import ValidationError | ||
|
||
from tdp.core.collection.read_dag import read_dag_directory, read_dag_file | ||
|
||
|
||
def test_read_dag_file(tmp_path: Path): | ||
dag_file_path = tmp_path / "dag_file.yml" | ||
dag_file_path.write_text( | ||
"""--- | ||
- name: s1_c1_a | ||
depends_on: | ||
- sx_cx_a | ||
- name: s2_c2_a | ||
depends_on: | ||
- s1_c1_a | ||
- name: s3_c3_a | ||
depends_on: | ||
- sx_cx_a | ||
- sy_cy_a | ||
""" | ||
) | ||
operations = list(read_dag_file(dag_file_path)) | ||
assert len(operations) == 3 | ||
assert operations[0].name == "s1_c1_a" | ||
assert operations[0].depends_on == ["sx_cx_a"] | ||
assert operations[1].name == "s2_c2_a" | ||
assert operations[1].depends_on == ["s1_c1_a"] | ||
assert operations[2].name == "s3_c3_a" | ||
assert operations[2].depends_on == ["sx_cx_a", "sy_cy_a"] | ||
|
||
|
||
def test_read_dag_file_empty(tmp_path: Path): | ||
dag_file_path = tmp_path / "dag_file.yml" | ||
dag_file_path.write_text("") | ||
with pytest.raises(ValidationError): | ||
list(read_dag_file(dag_file_path)) | ||
|
||
|
||
def test_read_dag_file_with_additional_props(tmp_path: Path): | ||
dag_file_path = tmp_path / "dag_file.yml" | ||
dag_file_path.write_text( | ||
"""--- | ||
- name: s1_c1_a | ||
depends_on: | ||
- sx_cx_a | ||
foo: bar | ||
""" | ||
) | ||
operations = list(read_dag_file(dag_file_path)) | ||
assert len(operations) == 1 | ||
assert operations[0].name == "s1_c1_a" | ||
assert operations[0].depends_on == ["sx_cx_a"] | ||
|
||
|
||
def test_get_collection_dag_nodes(tmp_path: Path): | ||
collection_path = tmp_path / "collection" | ||
dag_directory = "dag" | ||
(dag_directory_path := collection_path / dag_directory).mkdir( | ||
parents=True, exist_ok=True | ||
) | ||
dag_file_1 = dag_directory_path / "dag1.yml" | ||
dag_file_2 = dag_directory_path / "dag2.yml" | ||
dag_file_1.write_text( | ||
"""--- | ||
- name: s1_c1_a | ||
depends_on: | ||
- sx_cx_a | ||
""" | ||
) | ||
dag_file_2.write_text( | ||
"""--- | ||
- name: s2_c2_a | ||
depends_on: | ||
- s1_c1_a | ||
""" | ||
) | ||
dag_nodes = list(read_dag_directory(dag_directory_path)) | ||
assert len(dag_nodes) == 2 | ||
assert any( | ||
node.name == "s1_c1_a" and node.depends_on == ["sx_cx_a"] for node in dag_nodes | ||
) | ||
assert any( | ||
node.name == "s2_c2_a" and node.depends_on == ["s1_c1_a"] for node in dag_nodes | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters