Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Handling up_for_retry task instance states for AirflowTaskTimeout and AirflowException #44981

Closed
wants to merge 10 commits into from

Conversation

amoghrajesh
Copy link
Contributor

Only last 2 commits are relevant.

Dependent on #44977 and hence on #44954.
Handling the case of up_for_retry from the task SDK.

This exception can be thrown in multiple cases, two valid examples are:

  1. I have a dag which times out in 1 second but the task sleeps for 100 seconds, it will end up throwing the AirflowTaskTimeout exception:
import sys
from time import sleep

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta

from airflow.sdk.definitions.baseoperator import AirflowException


def print_hello():
    sleep(100)

with DAG(
    dag_id="hello_world_single_task",
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "retries": 1,
    },
    description="A simple Hello World DAG with one task",
    schedule=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    hello_task = PythonOperator(
        retries=1,
        task_id="say_hello",
        execution_timeout=timedelta(seconds=1),
        python_callable=print_hello,
    )

Should be marked with up_for_retry
image

  1. A task raises the AirflowException
    Example: If we are unable to render the pod template for K8s while using K8sExecutor with Airflow.
@provide_session
def get_rendered_k8s_spec(task_instance: TaskInstance, session=NEW_SESSION) -> dict | None:
    """Fetch rendered template fields from DB."""
    from airflow.models.renderedtifields import RenderedTaskInstanceFields

    rendered_k8s_spec = RenderedTaskInstanceFields.get_k8s_pod_yaml(task_instance, session=session)
    if not rendered_k8s_spec:
        try:
            rendered_k8s_spec = render_k8s_pod_yaml(task_instance)
        except (TemplateAssertionError, UndefinedError) as e:
            raise AirflowException(f"Unable to render a k8s spec for this taskinstance: {e}") from e
    return rendered_k8s_spec

Key changes:

  1. This PR adds the up_for_retry state into TerminalTIState as it is a terminal state and on hitting this state, anything additional work apart from marking it to that state needn't be done.
  2. Called the API from task runner when AirflowTaskTimeout, AirflowException is raised.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh requested review from kaxil and ashb and removed request for ephraimbuddy, pierrejeambrun and kaxil December 17, 2024 09:58
@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Dec 17, 2024
@@ -80,7 +80,7 @@ class TaskInstanceState(str, Enum):
SUCCESS = TerminalTIState.SUCCESS # Task completed
RESTARTING = IntermediateTIState.RESTARTING # External request to restart (e.g. cleared when running)
FAILED = TerminalTIState.FAILED # Task errored out
UP_FOR_RETRY = IntermediateTIState.UP_FOR_RETRY # Task failed but has retries left
UP_FOR_RETRY = TerminalTIState.UP_FOR_RETRY # Task failed but has retries left
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be IntermediateTIState, i.e. the task will be "retried" as opposed to success, failed, skipped etc -- where it TI is completed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in DM we discussed UPSTREAM_FAILED should be TerminalTIState not UP_FOR_RETRY

@@ -39,6 +39,7 @@ class TerminalTIState(str, Enum):
FAILED = "failed"
SKIPPED = "skipped" # A user can raise a AirflowSkipException from a task & it will be marked as skipped
REMOVED = "removed"
UP_FOR_RETRY = "up_for_retry" # We do not need to do anything actionable for this state, hence it is a terminal state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"terminal state" means the task is done; up for retry does not really feel like a terminal state.... since it's going to be retried....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this highlights an ambiguity / conflict. the TI-try is done, but the TI is not

@amoghrajesh
Copy link
Contributor Author

Thanks @kaxil and @dstandish. I got confused here with similar names "up_for_retry" and "upstream_failed". Will take a look and redo it tomorrow. Thanks

@amoghrajesh
Copy link
Contributor Author

Closing this PR as it is easier to create a new one

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk
Development

Successfully merging this pull request may close these issues.

3 participants