Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records #45129

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
13 changes: 10 additions & 3 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
import sys
from collections import defaultdict, deque
from collections.abc import Sequence
from collections.abc import Generator, Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Optional

Expand Down Expand Up @@ -544,13 +544,20 @@ def execute_async(
"""
raise NotImplementedError()

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
def get_task_log(
self, ti: TaskInstance, try_number: int
) -> (
tuple[list[str], list[Generator[tuple[pendulum.DateTime | None, int, str], None, None]], int]
| tuple[list[str], list[str]]
):
"""
Return the task logs.

:param ti: A TaskInstance object
:param try_number: current try_number to read log from
:return: tuple of logs and messages
:return:
- old interface: Tuple of messages and list of log lines.
- new interface: Tuple of messages, parsed log streams, total size of logs.
"""
return [], []

Expand Down
Loading
Loading