Skip to content

Commit

Permalink
fix(dag): avoid getting dataset next run info for unresolved dataset …
Browse files Browse the repository at this point in the history
…alias (apache#41828)
  • Loading branch information
Lee-W authored and joaopamaral committed Oct 21, 2024
1 parent 8edbf5c commit cfd89f9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
5 changes: 4 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3274,7 +3274,10 @@ def calculate_dagrun_date_fields(
def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None:
if self.dataset_expression is None:
return None
return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]

# When a dataset alias does not resolve into datasets, get_dataset_triggered_next_run_info returns
# an empty dict as there's no dataset info to get. This method should thus return None.
return get_dataset_triggered_next_run_info([self.dag_id], session=session).get(self.dag_id, None)


# NOTE: Please keep the list of arguments in sync with DAG.__init__.
Expand Down
4 changes: 3 additions & 1 deletion airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
:meta private:
"""

UNRESOLVED_ALIAS_SUMMARY = "Unresolved DatasetAlias"

description: str = "Triggered by datasets"

def __init__(self, datasets: BaseDataset) -> None:
Expand All @@ -170,7 +172,7 @@ def __init__(self, datasets: BaseDataset) -> None:
self.dataset_condition = _DatasetAliasCondition(self.dataset_condition.name)

if not next(self.dataset_condition.iter_datasets(), False):
self._summary = "Unresolved DatasetAlias"
self._summary = DatasetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY
else:
self._summary = "Dataset"

Expand Down
16 changes: 16 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3433,6 +3433,22 @@ def test_get_dataset_triggered_next_run_info(dag_maker, clear_datasets):
}


@pytest.mark.need_serialized_dag
def test_get_dataset_triggered_next_run_info_with_unresolved_dataset_alias(dag_maker, clear_datasets):
dataset_alias1 = DatasetAlias(name="alias")
with dag_maker(dag_id="dag-1", schedule=[dataset_alias1]):
pass
dag1 = dag_maker.dag
session = dag_maker.session
session.flush()

info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
assert info == {}

dag1_model = DagModel.get_dagmodel(dag1.dag_id)
assert dag1_model.get_dataset_triggered_next_run_info(session=session) is None


def test_dag_uses_timetable_for_run_id(session):
class CustomRunIdTimetable(Timetable):
def generate_run_id(self, *, run_type, logical_date, data_interval, **extra) -> str:
Expand Down

0 comments on commit cfd89f9

Please sign in to comment.