Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local resource override #42

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

local resource override #42

wants to merge 12 commits into from

Conversation

tylerzupan
Copy link
Collaborator

@tylerzupan tylerzupan commented Nov 20, 2024

Goal: Allow for a Prefect task decorated with @remote to be able to run both locally and in the Cloud without changing the code or remote args.

Prefect 2 tasks decorated with @remote will fail if a resource is set but the flow is run locally:

>>> @task(name="test_task")
... @remote(resource=ns8_gcp_resource)
... def test_cascade_task():
...     a = 1
...     b = 3
...     return a + b
...
>>> @flow
... def main_flow():
...     cascade_output = test_cascade_task()
... main_flow(return_state=True)
  File "/Users/tzupan/Development/sq-mml-modeling/.venv/lib/python3.9/site-packages/block_cascade/decorators.py", line 208, in remote_func
    raise RuntimeError(
RuntimeError: Unable to infer remaining environment for GcpResource. Please provide a complete environment to the configured GcpResource.
14:56:17.302 | ERROR   | Flow run 'yellow-civet' - Finished in state Failed('Flow run encountered an exception. RuntimeError: Unable to infer remaining environment for GcpResource. Please provide a complete environment to the configured GcpResource.')

Additionally, because of the way Prefect 2+ creates tasks, using remote_resource does not solve this problem:

>>> @task(name="test_task")
... @remote(resource=ns8_gcp_resource)
... def test_cascade_task(remote_resource):
...     a = 1
...     b = 3
...     return a + b
...
>>> @flow
... def main_flow():
...     cascade_output = test_cascade_task(remote_resource=None)
... main_flow(return_state=True)
  File "/Users/tzupan/Development/sq-mml-modeling/.venv/lib/python3.9/site-packages/block_cascade/decorators.py", line 208, in remote_func
    raise RuntimeError(
RuntimeError: Unable to infer remaining environment for GcpResource. Please provide a complete environment to the configured GcpResource.
14:56:17.302 | ERROR   | Flow run 'yellow-civet' - Finished in state Failed('Flow run encountered an exception. RuntimeError: Unable to infer remaining environment for GcpResource. Please provide a complete environment to the configured GcpResource.')

This PR allows for an optional no_resource_on_local argument which enables a Cascade decorated task to overwritte the resource to None if the flow is being run locally AND the arg is set to True:

>>> @task(name="test_task")
... @remote(resource=ns8_gcp_resource, no_resource_on_local=True)
... def test_cascade_task():
...     a = 1
...     b = 3
...     return a + b
...
>>> @flow
... def main_flow():
...     cascade_output = test_cascade_task()
...
>>> main_flow(return_state=True)
16:13:08.342 | INFO    | prefect.engine - Created flow run 'impetuous-gorilla' for flow 'main-flow'
16:13:08.344 | INFO    | Flow run 'impetuous-gorilla' - View at https://app.prefect.cloud/account/74ac256a-9115-4944-9aae-0e14d67fec00/workspace/a60bc68c-d550-411f-9149-c952c21858d1/flow-runs/flow-run/d9443599-77a4-4997-926b-a74ad708308a

16:13:08.765 | INFO    | Flow run 'impetuous-gorilla' - Created task run 'test_task-0' for task 'test_task'
16:13:08.768 | INFO    | Flow run 'impetuous-gorilla' - Executing 'test_task-0' immediately...
16:13:09.085 | INFO    | Task run 'test_task-0' - Via cloud? False
16:13:09.086 | INFO    | Task run 'test_task-0' - Executing task with LocalExecutor.
16:13:09.228 | INFO    | Task run 'test_task-0' - Finished in state Completed()
16:13:09.375 | INFO    | Flow run 'impetuous-gorilla' - Finished in state Completed('All states completed.')
Completed(message='All states completed.', type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `list`'))

@tylerzupan tylerzupan marked this pull request as ready for review November 21, 2024 18:13
Copy link
Collaborator

@jhamet93 jhamet93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a version bump once approved by all expected parties!

@@ -148,6 +152,11 @@ def remote_func(*args, **kwargs):
# so that it can be sent to the remote executor with its parameters
packed_func = wrapped_partial(func, *args, **kwargs)

# if running a flow locally ignore the remote resource, even if specified
# necessary for running a @remote decorated task in a local flow
if not via_cloud and no_resource_on_local:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a log that resource is being ignored

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Logging added!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants