Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG Fix] Launching dependent LocalPipelineExecutors with skip_completed=False lead to waiting #300

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,20 @@ def mark_rank_as_completed(self, rank: int):
"""
self.logging_dir.open(f"completions/{rank:05d}", "w").close()

def get_incomplete_ranks(self, ranks=None) -> list[int]:
def get_incomplete_ranks(self, ranks=None, skip_completed=None) -> list[int]:
"""
Gets a full list of ranks that are still incomplete.
Usually faster than calling `is_rank_completed` for each task.
`skip_completed` can be used to override the internal attribute.
Returns: list of ranks that are incomplete

"""
if skip_completed is None:
skip_completed = self.skip_completed
completed = set(self.logging_dir.list_files("completions"))
return list(
filter(
lambda rank: not self.skip_completed or f"completions/{rank:05d}" not in completed,
lambda rank: not skip_completed or f"completions/{rank:05d}" not in completed,
ranks if ranks is not None else range(self.world_size),
)
)
Expand Down
2 changes: 1 addition & 1 deletion src/datatrove/executor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def run(self):
if not self.depends._launched:
logger.info(f'Launching dependency job "{self.depends}"')
self.depends.run()
while (incomplete := len(self.depends.get_incomplete_ranks())) > 0:
while (incomplete := len(self.depends.get_incomplete_ranks(skip_completed=True))) > 0: # set skip_completed=True to get *real* incomplete task count
logger.info(f"Dependency job still has {incomplete}/{self.depends.world_size} tasks. Waiting...")
time.sleep(2 * 60)

Expand Down