Skip to content

Commit c66e893

Browse files
dstandishLefteris Gilmaz
authored and
Lefteris Gilmaz
committed
Use FOR KEY SHARE UPDATE instead of FOR UPDATE (apache#42082)
Generally speaking, the goal of our SELECT FOR UPDATE statements is concurrency control. We want to make sure that nothing else is handling the dag run (or ti) at the same time. In no case that I'm aware of, are we actually going to update the PK value for the record. So we can take the weaker lock, and still have that concurrency control. But this only works on postgres. Mysql has no such weaker for update lock. One negative consequence of taking the stronger lock, is it blocks any insert into a table of a row that references the locked row in a FK. This can cause slowdowns and pileups in some circumstances. In one case we do an extra FOR UPDATE lock because that FK wait issue was associated with a deadlock. This change makes that extra unnecessary in the postgres case so we remove it here. And with the mysql case in mind, we add a commit to avoid the deadlocking.
1 parent d0481f3 commit c66e893

File tree

3 files changed

+15
-20
lines changed

3 files changed

+15
-20
lines changed

airflow/models/taskinstance.py

+10-19
Original file line numberDiff line numberDiff line change
@@ -1673,8 +1673,6 @@ def _handle_reschedule(
16731673

16741674
ti = _coalesce_to_orm_ti(ti=ti, session=session)
16751675

1676-
from airflow.models.dagrun import DagRun # Avoid circular import
1677-
16781676
ti.refresh_from_db(session)
16791677

16801678
if TYPE_CHECKING:
@@ -1683,16 +1681,16 @@ def _handle_reschedule(
16831681
ti.end_date = timezone.utcnow()
16841682
ti.set_duration()
16851683

1686-
# Lock DAG run to be sure not to get into a deadlock situation when trying to insert
1687-
# TaskReschedule which apparently also creates lock on corresponding DagRun entity
1688-
with_row_locks(
1689-
session.query(DagRun).filter_by(
1690-
dag_id=ti.dag_id,
1691-
run_id=ti.run_id,
1692-
),
1693-
session=session,
1694-
).one()
1695-
# Log reschedule request
1684+
# set state
1685+
ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
1686+
1687+
ti.clear_next_method_args()
1688+
1689+
session.merge(ti)
1690+
session.commit()
1691+
1692+
# we add this in separate commit to reduce likelihood of deadlock
1693+
# see https://github.com/apache/airflow/pull/21362 for more info
16961694
session.add(
16971695
TaskReschedule(
16981696
ti.task_id,
@@ -1705,13 +1703,6 @@ def _handle_reschedule(
17051703
ti.map_index,
17061704
)
17071705
)
1708-
1709-
# set state
1710-
ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
1711-
1712-
ti.clear_next_method_args()
1713-
1714-
session.merge(ti)
17151706
session.commit()
17161707
return ti
17171708

airflow/utils/sqlalchemy.py

+4
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ def with_row_locks(
312312
*,
313313
nowait: bool = False,
314314
skip_locked: bool = False,
315+
key_share: bool = True,
315316
**kwargs,
316317
) -> Query:
317318
"""
@@ -329,6 +330,7 @@ def with_row_locks(
329330
:param session: ORM Session
330331
:param nowait: If set to True, will pass NOWAIT to supported database backends.
331332
:param skip_locked: If set to True, will pass SKIP LOCKED to supported database backends.
333+
:param key_share: If true, will lock with FOR KEY SHARE UPDATE (at least on postgres).
332334
:param kwargs: Extra kwargs to pass to with_for_update (of, nowait, skip_locked, etc)
333335
:return: updated query
334336
"""
@@ -343,6 +345,8 @@ def with_row_locks(
343345
kwargs["nowait"] = True
344346
if skip_locked:
345347
kwargs["skip_locked"] = True
348+
if key_share:
349+
kwargs["key_share"] = True
346350
return query.with_for_update(**kwargs)
347351

348352

tests/utils/test_sqlalchemy.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def test_with_row_locks(
147147
returned_value = with_row_locks(query=query, session=session, nowait=True)
148148

149149
if expected_use_row_level_lock:
150-
query.with_for_update.assert_called_once_with(nowait=True)
150+
query.with_for_update.assert_called_once_with(nowait=True, key_share=True)
151151
else:
152152
assert returned_value == query
153153
query.with_for_update.assert_not_called()

0 commit comments

Comments
 (0)