Each task execution time depends on the total number of tasks defined (got worse in 2.4.0) #26597
Replies: 4 comments 10 replies
-
I think the problem is with the warnings, and it would be great to see where they came from - more likely than not just logging that much of an information slows things down. Airlflow always parsed all tasks defined in the same time for each task. That has not changed. But in 2.4.0 is the first time when you can officialy optimize it (this is experimental feature but you can try it out) where you can write your Dynamic DAG in the way that it will optimize the task execution. This is not possible to optimize it automatically (because Dynamic DAG creation might have side-effects and we cannot skip parsing/creating all DAGs in this case. All is in your hands to write your DAG in the way to take advantage of that: You can also read more about this approach in two blog posts which are discussing the subject:
Please read those and apply the optimizations and let us know the optimisation results you got. Regarding the warnings - you have to fix your DAGs to get rid of the warnings. Converting it into discussion. |
Beta Was this translation helpful? Give feedback.
-
I was getting the same warnings in 2.4.0, in DAGs with simple taskflow based tasks (nothing dynamic) that weren't giving warnings prior to 2.4.0, and I wanted to test with a simpler one. I copied the exact dag from the taskflow tutorial documentation, the simple extract-transform-load example located here: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#example-taskflow-api-pipeline import json
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api() Running this gives the same warnings, even though no complex dependencies, no dynamic task generation exists: Log from the last step:
What can be the reason for this? |
Beta Was this translation helpful? Give feedback.
-
@potiuk Why did you convert this to a discussion, it is a huge regression. Airflow should not be issuing those warnings, nor should it slow down the exectuion over 2.3. The dynamic nature of the OP DAG is just an easy way of demonstrating the problem. |
Beta Was this translation helpful? Give feedback.
-
Sorry for muddling things by focusing on performance instead of the warnings. It seemed to me that the warnings were a clue that too much work was being done. I was worried that we might fix the warnings and hide the clue, while leaving the performance perspective unexplored. Also, just so I have my terms straight... is my test dag above a dynamic one? I think it's known how many tasks each dag has at parse time. What's weird about it is that it uses both the taskflow and the traditional API's (as did the dag that initially lead me to this issue). |
Beta Was this translation helpful? Give feedback.
-
Apache Airflow version
2.4.0
What happened
Here's 10 dags with 200 tasks each:
In 2.3.4, I'd see dag execution times of 1:11 and task logs like this:
In 2.4.0 I see dag execution times of 1:40 and task logs like this:
If we double the load, we see a ~3.5x increase in task execution time.
Also, increasing the version from 2.3.4 to 2.4.0 shows a ~1.4x increase in task execution time
Increasing the load to 1000 caused dagbag timeouts in the middle of the warning log:
So it looks like increasing the version from 2.3.4 to 2.4.0 slowed things down linearly.
What you think should happen instead
The content of these warnings is less worrisome to me than the fact that I see warnings about Dag9Task99 in the logs for Dag1Task1. If we really must reserialize the dag file every time we run a task, can we at least short circuit checks that don't pertain to the task at hand?
Better still would be to handle whatever this is just one time instead of on every task. I'm guessing that's what's slowing things down. 2.4.0 didn't really cause it, it just made it slightly worse and much more visible.
How to reproduce
run dag 0, keep track of the time. redeploy with a heavier load, do it again. Notice the volume of dependency warnings.
Operating System
Docker Desktop (mac os)
Versions of Apache Airflow Providers
n/a
Deployment
Official Apache Airflow Helm Chart
Deployment details
Using kubernetes in docker desktop
Dockerfile:
deploy.sh:
Anything else
No response
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions