Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenLineage failed to send DAG start event #44984

Open
2 tasks done
paul-laffon-dd opened this issue Dec 17, 2024 · 7 comments
Open
2 tasks done

OpenLineage failed to send DAG start event #44984

paul-laffon-dd opened this issue Dec 17, 2024 · 7 comments
Labels
area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:openlineage AIP-53

Comments

@paul-laffon-dd
Copy link

Apache Airflow Provider(s)

openlineage

Versions of Apache Airflow Providers

1.14.0

Also seeing missing start DAG events for versions <= 1.12.0. However, those versions weren't logging the exception, making it difficult to determine if this is the same issue.

Apache Airflow version

2.10.1

Operating System

Amazon Linux

Deployment

Amazon (AWS) MWAA

Deployment details

MWAA with:

  • requirements.txt with apache-airflow-providers-openlineage==1.14.0
  • startup.sh with OPENLINEAGE_URL pointing to a webserver logging all received requests

What happened

OpenLineage provider failed to send some DAG start events, with the following exception in the scheduler logs:

[2024-12-17T00:44:00.564+0000] {listener.py:528} WARNING - Failed to submit method to executor
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <functools._lru_cache_wrapper object at 0x7fb8f1c02980>: 
it's not the same object as airflow.models.abstractoperator.AbstractOperator.get_parse_time_mapped_ti_count
"""

What you think should happen instead

No response

How to reproduce

The failures to send events were non-deterministic and appear to be caused by a race condition. They seem to occur more frequently when multiple DAGs are being scheduled simultaneously.

I used this code to reproduce the issue, and it failed to send at least one DAG start almost every minute.

for i in range(4):
    with DAG(
        f'frequent_dag_{i}',
        schedule_interval=timedelta(minutes=1),
        start_date=days_ago(1),
        catchup=False,
    ) as dag:
        def task():
            print("Task is running")

        task = PythonOperator(
            task_id=f'print_task_{i}',
            python_callable=task,
            dag=dag,
        )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@paul-laffon-dd paul-laffon-dd added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 17, 2024
Copy link

boring-cyborg bot commented Dec 17, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added the provider:openlineage AIP-53 label Dec 17, 2024
@eladkal
Copy link
Contributor

eladkal commented Dec 17, 2024

cc @kacpermuda seems like #42448 didn't fix this issue?

@kacpermuda
Copy link
Contributor

kacpermuda commented Dec 17, 2024

I think it's unrelated to #42448, but it should not happen anyway. Thanks @paul-laffon-dd for reporting that, I'll investigate it in my free time.
cc @mobuchowski @JDarDagran

@paul-laffon-dd
Copy link
Author

Thanks @kacpermuda

From my understanding of the issue there is one argument of the dag_started that is trying to serialize an operator where the get_parse_time_mapped_ti_count has already been computed and cached. it's unclear to me which facet is holding this operator

What do you think of switching to a ThreadPoolExecutor instead of a ProcessPoolExecutor ? This would eliminate the need for serialization while still allowing asynchronous execution.

@potiuk
Copy link
Member

potiuk commented Dec 20, 2024

What do you think of switching to a ThreadPoolExecutor instead of a ProcessPoolExecutor ? This would eliminate the need for serialization while still allowing asynchronous execution.

From what I understand, there were MANY problems with previous implementation using ThreadPoolExecutor. The problem is that Threads are very flawed concept in Python due to GIL - and spawning new threads without full control over running any other threads and what they do (especially when you involve low-level C-code implemented in some libraries called from Python code) introduces a lot of contention, deadlock possibilities, various kinds of errors, especially if those libraries are not written in fully "thread-safe" way. I think @kacpermuda and @mobuchowski had a LOT of problems - particularly with Snowlake integration - caused by this.

@mobuchowski
Copy link
Contributor

mobuchowski commented Dec 20, 2024

The solution should be to not serialize any operators. Not exactly sure where the operator is coming from (I don't think it's this:

def _get_tasks_details(dag: DAG) -> dict:
). If we need something from operator, we should select the properties we need and pass them explicitely, rather than relying on pickle.

Regarding ThreadPoolExecutor, we've switched from that solution since it caused even worse issues: #39235

@paul-laffon-dd do you know where it's coming from, or have a reproduction?

@paul-laffon-dd
Copy link
Author

I don't know from which facet this is coming from and I don't have a way to deterministically reproduce it. From my understanding, this only happens if the result of get_parse_time_mapped_ti_count is cached before the serialization occurs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:openlineage AIP-53
Projects
None yet
Development

No branches or pull requests

5 participants