Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add submit_taskgraph for a common UX #641

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/tiledb/cloud/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ._common import run_dag
from ._common import serialize_filter
from ._common import set_aws_context
from ._common import submit_taskgraph
from .consolidate import consolidate_and_vacuum
from .consolidate import consolidate_fragments
from .consolidate import group_fragments
Expand All @@ -31,6 +32,7 @@
"read_file",
"run_dag",
"set_aws_context",
"submit_taskgraph",
"consolidate_fragments",
"consolidate_and_vacuum",
"group_fragments",
Expand Down
69 changes: 69 additions & 0 deletions src/tiledb/cloud/utilities/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
from concurrent.futures._base import TimeoutError
from fnmatch import fnmatch
from typing import (
Any,
Expand Down Expand Up @@ -195,6 +196,74 @@ def run_dag(
_print_logs(graph, debug=debug)


def submit_taskgraph(
graph: dag.DAG,
*,
wait: bool = True,
update_sec: int = 10,
) -> Mapping[str, str]:
"""
Submit a taskgraph and optionally wait for completion.

:param graph: taskgraph to submit
:param wait: wait for the taskgraph to complete, defaults to True
:param update_sec: interval in seconds to update the progress bar, defaults to 10
:return: dictionary with status and graph_id
"""

# Try to import tqdm, if not available, stub it out
try:
from tqdm import tqdm
except ImportError:

class tqdm:
def __init__(self, *args, **kwargs):
print("Please install tqdm to display a progress timer.")

def set_description_str(self, *args, **kwargs):
pass

graph.compute()

# Display a link to the log in TileDB, if available
if graph.server_graph_uuid:
print(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's opt in to printing. I don't think a library function should print by default. I can't think of anything in Python's standard lib that prints except print().

Like if verbose and graph.server_graph_uuid:

f"Taskgraph submitted to TileDB - https://cloud.tiledb.com/activity/taskgraphs/{graph.namespace}/{graph.server_graph_uuid}",
)

if wait:
# Display the progress bar in stdout, so the output is not red in a notebook
pbar = tqdm(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, I think a progress bar should be explicitly opt-in and not conditioned on waiting. I can easily imagine wanting to waiting silently. How about if verbose: pbar = ...?

desc="Not Started",
bar_format="{desc}: {elapsed} (min:sec)",
file=sys.stdout,
)
cancelled = False

while True:
try:
graph.wait(update_sec)
except TimeoutError:
# Still running
pbar.set_description_str(str(graph.status))
continue
Copy link
Collaborator

@sgillies sgillies Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about what graph.wait() does. Is the status set only when it returns? This is just a question stemming from my ignorance, not a blocker.

except RuntimeError as e:
if "No executions found for done Node" in str(e):
cancelled = True
except StopIteration:
cancelled = True

if cancelled:
pbar.set_description("Cancelled")
return {"status": "Cancelled", "graph_id": str(graph.server_graph_uuid)}

# The taskgraph is done, break the loop
pbar.set_description_str(str(graph.status))
break

return {"status": str(graph.status), "graph_id": str(graph.server_graph_uuid)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more natural for a Python function to return a tuple, no? More like standard lib functions is what i mean.



def _print_logs(
graph: dag.DAG,
*,
Expand Down
21 changes: 21 additions & 0 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import pathlib
import pickle
import tempfile
import time
import unittest

import pytz # Test-only dependency.

from tiledb.cloud import dag
from tiledb.cloud._common import utils
from tiledb.cloud.utilities import find
from tiledb.cloud.utilities import submit_taskgraph


class UtilsTest(unittest.TestCase):
Expand Down Expand Up @@ -69,6 +72,24 @@ def test_find(self):
len(list(find(tmp, include=lambda f: f.endswith(".dat")))), 1
)

def test_submit_taskgraph(self):
# Submit and wait for completion.
tg = dag.DAG()

result = tg.submit_local(lambda: (time.sleep(3), 42)[1])
status = submit_taskgraph(tg, update_sec=1)

assert status == {"status": "Completed", "graph_id": "None"}
assert result.result() == 42

# Submit and continue.
tg = dag.DAG()
result = tg.submit_local(lambda: (time.sleep(3), 42)[1])
status = submit_taskgraph(tg, wait=False)

assert status == {"status": "Running", "graph_id": "None"}
assert result.result() == 42


def _b64_unpickle(x):
raw = base64.b64decode(x)
Expand Down
Loading