Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-1573: raise error on parallel execution with asyncio.gather() #24

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions upstash_workflow/asyncio/context/auto_executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List, Union, Literal, cast, Any, TypeVar
import json
from threading import Lock
from qstash.message import BatchJsonRequest
from upstash_workflow.constants import NO_CONCURRENCY
from upstash_workflow.error import WorkflowError, WorkflowAbort
Expand Down Expand Up @@ -28,6 +29,8 @@ def __init__(self, context: AsyncWorkflowContext[Any], steps: List[DefaultStep])
self.step_count: int = 0
self.plan_step_count: int = 0
self.executing_step: Union[str, Literal[False]] = False
self._already_executed: bool = False
self._lock = Lock()

async def add_step(self, step_info: _BaseLazyStep[TResult]) -> TResult:
self.step_count += 1
Expand All @@ -48,6 +51,13 @@ async def run_single(self, lazy_step: _BaseLazyStep[TResult]) -> Any:
_validate_step(lazy_step, step)
return step.out

with self._lock:
mdumandag marked this conversation as resolved.
Show resolved Hide resolved
if self._already_executed:
raise WorkflowError(
"Running parallel steps is not yet available in workflow-py. Ensure that you are awaiting the steps sequentially."
)
self._already_executed = True

result_step = await lazy_step.get_result_step(NO_CONCURRENCY, self.step_count)
await self.submit_steps_to_qstash([result_step], [lazy_step])
return result_step.out
Expand Down
Loading