Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Allow pushing and pulling XCom from Task Context #45075

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Dec 19, 2024

Part of #44481

Example DAG used:

"""
### DAG Tutorial Documentation
This DAG is demonstrating an Extract -> Transform -> Load pipeline
"""

from __future__ import annotations

import json
import textwrap

import pendulum

from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator

with DAG(
    "sdk_tutorial_dag",
    default_args={"retries": 2},
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    dag.doc_md = __doc__

    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)

    def transform(**kwargs):
        ti = kwargs["ti"]
        order_data = ti.xcom_pull(task_ids="extract", key="order_data")

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push("total_order_value", total_value_json_string)

    def load(**kwargs):
        ti = kwargs["ti"]
        total_order_value = ti.xcom_pull(task_ids="transform", key="total_order_value")

        print(total_order_value)

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )
    extract_task.doc_md = textwrap.dedent(
        """\
    #### Extract task
    A simple Extract task to get data ready for the rest of the data pipeline.
    In this case, getting data is simulated by reading from a hardcoded JSON string.
    This data is then put into xcom, so that it can be processed by the next task.
    """
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )
    transform_task.doc_md = textwrap.dedent(
        """\
    #### Transform task
    A simple Transform task which takes in the collection of order data from xcom
    and computes the total order value.
    This computed value is then put into xcom, so that it can be processed by the next task.
    """
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
    )
    load_task.doc_md = textwrap.dedent(
        """\
    #### Load task
    A simple Load task which takes in the result of the Transform task, by reading it
    from xcom and instead of saving it to end user review, just prints it out.
    """
    )

    extract_task >> transform_task >> load_task

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kaxil kaxil changed the title AIP-72: Allow pushing and pulling XCom from a Task AIP-72: Allow pushing and pulling XCom from Task Context Dec 19, 2024
@kaxil kaxil force-pushed the conn-context branch 4 times, most recently from ed34297 to 3a60948 Compare December 20, 2024 12:43
kaxil added a commit that referenced this pull request Dec 20, 2024
It fixes the following bug

```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
	{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
	{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
	...
```

To reproduce just run `tutorial_dag` or the following minimal dag:

```python
import pendulum

from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator

with DAG(
    "sdk_tutorial_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    dag.doc_md = __doc__

    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )

    extract_task
```

I need this fix for #45075 (part of the getting [Task Context working with AIP-72](#44481))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant