From aaf29ee54bd8666a8dc3a129aed213f6b3b31bde Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 16 Dec 2024 15:20:23 +0800 Subject: [PATCH] Remove reparse on DAG depending on asset alias (#44866) I suspect we no longer need this since alias resolution now happens very late in the scheduling process. A DAG *should* be able to automatically understand the alias is resolved without a reparse after refactorings introduced in cccc9334e3123423f678c7d237c544b45a76743e. --- airflow/assets/manager.py | 5 ----- tests/assets/test_manager.py | 2 -- 2 files changed, 7 deletions(-) diff --git a/airflow/assets/manager.py b/airflow/assets/manager.py index 364d01607e5c4..99de69176a878 100644 --- a/airflow/assets/manager.py +++ b/airflow/assets/manager.py @@ -174,11 +174,6 @@ def register_asset_change( if alias_ref.dag.is_active and not alias_ref.dag.is_paused } - dags_to_reparse = dags_to_queue_from_asset_alias - dags_to_queue_from_asset - if dags_to_reparse: - file_locs = {dag.fileloc for dag in dags_to_reparse} - cls._send_dag_priority_parsing_request(file_locs, session) - cls.notify_asset_changed(asset=asset) Stats.incr("asset.updates") diff --git a/tests/assets/test_manager.py b/tests/assets/test_manager.py index afbbfc23adee1..b47dae7f2e7f1 100644 --- a/tests/assets/test_manager.py +++ b/tests/assets/test_manager.py @@ -34,7 +34,6 @@ DagScheduleAssetReference, ) from airflow.models.dag import DagModel -from airflow.models.dagbag import DagPriorityParsingRequest from airflow.sdk.definitions.asset import Asset, AssetAlias from tests.listeners import asset_listener @@ -139,7 +138,6 @@ def test_register_asset_change_with_alias(self, session, dag_maker, mock_task_in # Ensure we've created an asset assert session.query(AssetEvent).filter_by(asset_id=asm.id).count() == 1 assert session.query(AssetDagRunQueue).count() == 2 - assert session.query(DagPriorityParsingRequest).count() == 2 def test_register_asset_change_no_downstreams(self, session, mock_task_instance): asset_manager = AssetManager()