Skip to content

Commit

Permalink
DX-1573: raise error on parallel execution with asyncio.gather() (#24)
Browse files Browse the repository at this point in the history
* feat: raise error on parallel execution with asyncio.gather()

* refactor: remove unnecessary lock
  • Loading branch information
yunusemreozdemir authored Jan 21, 2025
1 parent f2dc09a commit 560656e
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions upstash_workflow/asyncio/context/auto_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self, context: AsyncWorkflowContext[Any], steps: List[DefaultStep])
self.step_count: int = 0
self.plan_step_count: int = 0
self.executing_step: Union[str, Literal[False]] = False
self._already_executed: bool = False

async def add_step(self, step_info: _BaseLazyStep[TResult]) -> TResult:
self.step_count += 1
Expand All @@ -48,6 +49,12 @@ async def run_single(self, lazy_step: _BaseLazyStep[TResult]) -> Any:
_validate_step(lazy_step, step)
return step.out

if self._already_executed:
raise WorkflowError(
"Running parallel steps is not yet available in workflow-py. Ensure that you are awaiting the steps sequentially."
)
self._already_executed = True

result_step = await lazy_step.get_result_step(NO_CONCURRENCY, self.step_count)
await self.submit_steps_to_qstash([result_step], [lazy_step])
return result_step.out
Expand Down

0 comments on commit 560656e

Please sign in to comment.