Skip to content

Commit

Permalink
Merge branch 'main' into opensearch-chart-option
Browse files Browse the repository at this point in the history
  • Loading branch information
topherinternational authored Dec 19, 2024
2 parents 2d5d433 + 65b110d commit cd3bfd0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
12 changes: 11 additions & 1 deletion task_sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,18 @@ def run(ti: RuntimeTaskInstance, log: Logger):
)

# TODO: Run task failure callbacks here
except (AirflowTaskTimeout, AirflowException, AirflowTaskTerminated):
except (AirflowTaskTimeout, AirflowException):
# TODO: handle the case of up_for_retry here
...
except AirflowTaskTerminated:
# External state updates are already handled with `ti_heartbeat` and will be
# updated already be another UI API. So, these exceptions should ideally never be thrown.
# If these are thrown, we should mark the TI state as failed.
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
)
# TODO: Run task failure callbacks here
except SystemExit:
...
except BaseException:
Expand Down
12 changes: 11 additions & 1 deletion task_sdk/tests/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
import pytest
from uuid6 import uuid7

from airflow.exceptions import AirflowFailException, AirflowSensorTimeout, AirflowSkipException
from airflow.exceptions import (
AirflowFailException,
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTerminated,
)
from airflow.sdk import DAG, BaseOperator, Connection
from airflow.sdk.api.datamodels._generated import TaskInstance, TerminalTIState
from airflow.sdk.execution_time.comms import (
Expand Down Expand Up @@ -352,6 +357,11 @@ def __init__(self, *args, **kwargs):
"sensor-timeout-exception",
AirflowSensorTimeout("Oops. Failing by AirflowSensorTimeout!"),
),
pytest.param(
"basic_failed3",
"task-terminated-exception",
AirflowTaskTerminated("Oops. Failing by AirflowTaskTerminated!"),
),
],
)
def test_run_basic_failed(time_machine, mocked_parse, dag_id, task_id, fail_with_exception, make_ti_context):
Expand Down

0 comments on commit cd3bfd0

Please sign in to comment.