Skip to content

Commit

Permalink
fix DagPriorityParsingRequest unique constraint error when dataset al…
Browse files Browse the repository at this point in the history
…iases are resolved into new datasets (#41398)

* fix(datasets/manager): fix DagPriorityParsingRequest unique constraint error when dataset aliases are resolved into new datasets

this happens when dynamic task mapping is used

* refactor(dataset/manager): reword debug log

Co-authored-by: Ephraim Anierobi <[email protected]>

* refactor(dataset/manager): remove unnecessary logging

Co-authored-by: Ephraim Anierobi <[email protected]>

---------

Co-authored-by: Ephraim Anierobi <[email protected]>
(cherry picked from commit bf64cb6)
  • Loading branch information
Lee-W authored and utkarsharma2 committed Aug 12, 2024
1 parent 8ea4eb1 commit e001b88
Showing 1 changed file with 31 additions and 4 deletions.
35 changes: 31 additions & 4 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,8 @@ def register_dataset_change(

dags_to_reparse = dags_to_queue_from_dataset_alias - dags_to_queue_from_dataset
if dags_to_reparse:
session.add_all(
DagPriorityParsingRequest(fileloc=fileloc)
for fileloc in {dag.fileloc for dag in dags_to_reparse}
)
file_locs = {dag.fileloc for dag in dags_to_reparse}
cls._send_dag_priority_parsing_request(file_locs, session)
session.flush()

cls.notify_dataset_changed(dataset=dataset)
Expand Down Expand Up @@ -208,6 +206,35 @@ def _postgres_queue_dagruns(cls, dataset_id: int, dags_to_queue: set[DagModel],
stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
session.execute(stmt, values)

@classmethod
def _send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
if session.bind.dialect.name == "postgresql":
return cls._postgres_send_dag_priority_parsing_request(file_locs, session)
return cls._slow_path_send_dag_priority_parsing_request(file_locs, session)

@classmethod
def _slow_path_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str | None:
# Don't error whole transaction when a single DagPriorityParsingRequest item conflicts.
# https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
req = DagPriorityParsingRequest(fileloc=fileloc)
try:
with session.begin_nested():
session.merge(req)
except exc.IntegrityError:
cls.logger().debug("Skipping request %s, already present", req, exc_info=True)
return None
return req.fileloc

(_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in file_locs)

@classmethod
def _postgres_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
from sqlalchemy.dialects.postgresql import insert

stmt = insert(DagPriorityParsingRequest).on_conflict_do_nothing()
session.execute(stmt, {"fileloc": fileloc for fileloc in file_locs})


def resolve_dataset_manager() -> DatasetManager:
"""Retrieve the dataset manager."""
Expand Down

0 comments on commit e001b88

Please sign in to comment.