Skip to content

Commit

Permalink
Remove schedule_interval from OpenLineage provider
Browse files Browse the repository at this point in the history
The key in facets are kept for backward compatibility, but they now do
not produce a value in the resulting JSON.

A new key timetable_summary is added on Airflow 3 to replace the old
key, similar to how it's done in the REST API.
  • Loading branch information
uranusjr committed Aug 26, 2024
1 parent 9ff606d commit 4686e04
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 24 deletions.
4 changes: 4 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,10 @@ def dag_id(self) -> str:
def dag_id(self, value: str) -> None:
self._dag_id = value

@property
def timetable_summary(self) -> str:
return self.timetable.summary

@property
def max_active_tasks(self) -> int:
return self._max_active_tasks
Expand Down
11 changes: 10 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,16 @@ def _include_fields(self):
class DagInfo(InfoJsonEncodable):
"""Defines encoding DAG object to JSON."""

includes = ["dag_id", "description", "fileloc", "owner", "schedule_interval", "start_date", "tags"]
includes = [
"dag_id",
"description",
"fileloc",
"owner",
"schedule_interval", # For Airflow 2.
"timetable_summary", # For Airflow 3.
"start_date",
"tags",
]
casts = {"timetable": lambda dag: dag.timetable.serialize() if getattr(dag, "timetable", None) else None}
renames = {"_dag_id": "dag_id"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ An Operator inside the Airflow DAG can be annotated with inlets and outlets like
with DAG(
dag_id="example_operator",
schedule_interval="@once",
schedule="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
task1 = BashOperator(
Expand Down
4 changes: 4 additions & 0 deletions newsfragments/41453.significant.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ The ``schedule_interval`` and ``timetable`` arguments are removed from ``DAG``.
The ``schedule_interval`` _attribute_ has also been removed. In the API, a new
``timetable_summary`` field has been added to replace ``schedule_interval`` for
presentation purposes.

Since the DAG object no longer has the ``schedule_interval`` attribute,
OpenLineage facets that contain the ``dag`` key produced on Airflow 3.0 or
later will also no longer contain the field.
1 change: 1 addition & 0 deletions tests/api_connexion/schemas/test_dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def test_serialize_test_dag_detail_schema(url_safe_serializer):
"start_date": "2020-06-19T00:00:00+00:00",
"tags": [{"name": "example1"}, {"name": "example2"}],
"template_searchpath": None,
"timetable_summary": "1 day, 0:00:00",
"timezone": UTC_JSON_REPR,
"max_active_runs": 16,
"max_consecutive_failed_dag_runs": 0,
Expand Down
10 changes: 8 additions & 2 deletions tests/providers/fab/auth_manager/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,10 @@ def test_get_accessible_dag_ids(mock_is_logged_in, app, security_manager, sessio
],
) as user:
mock_is_logged_in.return_value = True
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", timetable_summary="2 2 * * *")
if hasattr(DagModel, "schedule_interval"): # Airflow 2 compat.
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
else: # Airflow 3.
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", timetable_summary="2 2 * * *")
session.add(dag_model)
session.commit()

Expand Down Expand Up @@ -544,7 +547,10 @@ def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(
],
) as user:
mock_is_logged_in.return_value = True
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", timetable_summary="2 2 * * *")
if hasattr(DagModel, "schedule_interval"): # Airflow 2 compat.
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
else: # Airflow 3.
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", timetable_summary="2 2 * * *")
session.add(dag_model)
session.commit()

Expand Down
25 changes: 15 additions & 10 deletions tests/providers/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,20 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
job_facets=job_facets,
)

expected_dag_info = {
"timetable": {"delta": 86400.0},
"dag_id": dag_id,
"description": "dag desc",
"owner": "airflow",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": [],
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
}
if hasattr(dag, "schedule_interval"): # Airflow 2 compat.
expected_dag_info["schedule_interval"] = "86400.0 seconds"
else: # Airflow 3 and up.
expected_dag_info["timetable_summary"] = "1 day, 0:00:00"

assert len(client.emit.mock_calls) == 1
assert (
call(
Expand All @@ -586,16 +600,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
nominalEndTime=event_time.isoformat(),
),
"airflowDagRun": AirflowDagRunFacet(
dag={
"timetable": {"delta": 86400.0},
"dag_id": dag_id,
"description": "dag desc",
"owner": "airflow",
"schedule_interval": "86400.0 seconds",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": [],
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
},
dag=expected_dag_info,
dagRun={
"conf": {},
"dag_id": "dag_id",
Expand Down
25 changes: 15 additions & 10 deletions tests/providers/openlineage/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,23 @@ def test_get_airflow_dag_run_facet():
dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4, tzinfo=datetime.timezone.utc)

result = get_airflow_dag_run_facet(dagrun_mock)

expected_dag_info = {
"dag_id": "dag",
"description": None,
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "airflow",
"timetable": {},
"start_date": "2024-06-01T00:00:00+00:00",
"tags": ["test"],
}
if hasattr(dag, "schedule_interval"): # Airflow 2 compat.
expected_dag_info["schedule_interval"] = "@once"
else: # Airflow 3 and up.
expected_dag_info["timetable_summary"] = "@once"
assert result == {
"airflowDagRun": AirflowDagRunFacet(
dag={
"dag_id": "dag",
"description": None,
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "airflow",
"timetable": {},
"schedule_interval": "@once",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": ["test"],
},
dag=expected_dag_info,
dagRun={
"conf": {},
"dag_id": "dag",
Expand Down

0 comments on commit 4686e04

Please sign in to comment.