Skip to content

Commit

Permalink
Merge pull request #11 from upstash/DX-1555
Browse files Browse the repository at this point in the history
DX-1555: pydoc
  • Loading branch information
CahidArda authored Jan 14, 2025
2 parents bfe175e + 55e7666 commit 4300219
Showing 13 changed files with 282 additions and 0 deletions.
24 changes: 24 additions & 0 deletions upstash_workflow/asyncio/context/auto_executor.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,15 @@ async def add_step(self, step_info: BaseLazyStep[TResult]) -> TResult:
return cast(TResult, await self.run_single(step_info))

async def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
"""
Executes a step:
- If the step result is available in the steps, returns the result
- If the result is not avaiable, runs the function
- Sends the result to QStash
:param lazy_step: lazy step to execute
:return: step result
"""
if self.step_count < self.non_plan_step_count:
step = self.steps[self.step_count + self.plan_step_count]
validate_step(lazy_step, step)
@@ -46,6 +55,11 @@ async def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
async def submit_steps_to_qstash(
self, steps: List[DefaultStep], lazy_steps: List[BaseLazyStep[Any]]
) -> None:
"""
sends the steps to QStash as batch
:param steps: steps to send
"""
if not steps:
raise WorkflowError(
f"Unable to submit steps to QStash. Provided list is empty. Current step: {self.step_count}"
@@ -110,6 +124,16 @@ async def submit_steps_to_qstash(


def validate_step(lazy_step: BaseLazyStep[Any], step_from_request: DefaultStep) -> None:
"""
Given a BaseLazyStep which is created during execution and a Step parsed
from the incoming request; compare the step names and types to make sure
that they are the same.
Raises `WorkflowError` if there is a difference.
:param lazy_step: lazy step created during execution
:param step_from_request: step parsed from incoming request
"""
if lazy_step.step_name != step_from_request.step_name:
raise WorkflowError(
f"Incompatible step name. Expected '{lazy_step.step_name}', "
10 changes: 10 additions & 0 deletions upstash_workflow/asyncio/context/context.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,12 @@


class WorkflowContext(Generic[TInitialPayload]):
"""
Upstash Workflow context
See the docs for fields and methods https://upstash.com/docs/workflow/basics/context
"""

def __init__(
self,
qstash_client: AsyncQStash,
@@ -164,4 +170,8 @@ async def call(
return cast(CallResponse[Any], result)

async def _add_step(self, step: BaseLazyStep[TResult]) -> TResult:
"""
Adds steps to the executor. Needed so that it can be overwritten in
DisabledWorkflowContext.
"""
return await self._executor.add_step(step)
6 changes: 6 additions & 0 deletions upstash_workflow/asyncio/workflow_parser.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,12 @@


async def get_payload(request: Request) -> Optional[str]:
"""
Gets the request body. If that fails, returns None
:param request: request received in the workflow api
:return: request body
"""
try:
return json.dumps(await request.json())
except Exception:
22 changes: 22 additions & 0 deletions upstash_workflow/asyncio/workflow_requests.py
Original file line number Diff line number Diff line change
@@ -84,6 +84,28 @@ async def handle_third_party_call_result(
workflow_url: str,
retries: int,
) -> Literal["call-will-retry", "is-call-return", "continue-workflow"]:
"""
Check if the request is from a third party call result. If so,
call QStash to add the result to the ongoing workflow.
Otherwise, do nothing.
### How third party calls work
In third party calls, we publish a message to the third party API.
the result is then returned back to the workflow endpoint.
Whenever the workflow endpoint receives a request, we first check
if the incoming request is a third party call result coming from QStash.
If so, we send back the result to QStash as a result step.
:param request: Incoming request
:param request_payload: Request payload
:param client: QStash client
:param workflow_url: Workflow URL
:param retries: Number of retries
:return: "call-will-retry", "is-call-return" or "continue-workflow"
"""
try:
if request.headers and request.headers.get("Upstash-Workflow-Callback"):
if request_payload:
24 changes: 24 additions & 0 deletions upstash_workflow/context/auto_executor.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,15 @@ def add_step(self, step_info: BaseLazyStep[TResult]) -> TResult:
return cast(TResult, self.run_single(step_info))

def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
"""
Executes a step:
- If the step result is available in the steps, returns the result
- If the result is not avaiable, runs the function
- Sends the result to QStash
:param lazy_step: lazy step to execute
:return: step result
"""
if self.step_count < self.non_plan_step_count:
step = self.steps[self.step_count + self.plan_step_count]
validate_step(lazy_step, step)
@@ -46,6 +55,11 @@ def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
def submit_steps_to_qstash(
self, steps: List[DefaultStep], lazy_steps: List[BaseLazyStep[Any]]
) -> None:
"""
sends the steps to QStash as batch
:param steps: steps to send
"""
if not steps:
raise WorkflowError(
f"Unable to submit steps to QStash. Provided list is empty. Current step: {self.step_count}"
@@ -110,6 +124,16 @@ def submit_steps_to_qstash(


def validate_step(lazy_step: BaseLazyStep[Any], step_from_request: DefaultStep) -> None:
"""
Given a BaseLazyStep which is created during execution and a Step parsed
from the incoming request; compare the step names and types to make sure
that they are the same.
Raises `WorkflowError` if there is a difference.
:param lazy_step: lazy step created during execution
:param step_from_request: step parsed from incoming request
"""
if lazy_step.step_name != step_from_request.step_name:
raise WorkflowError(
f"Incompatible step name. Expected '{lazy_step.step_name}', "
10 changes: 10 additions & 0 deletions upstash_workflow/context/context.py
Original file line number Diff line number Diff line change
@@ -33,6 +33,12 @@


class WorkflowContext(Generic[TInitialPayload]):
"""
Upstash Workflow context
See the docs for fields and methods https://upstash.com/docs/workflow/basics/context
"""

def __init__(
self,
qstash_client: QStash,
@@ -163,4 +169,8 @@ def call(
return cast(CallResponse[Any], result)

def _add_step(self, step: BaseLazyStep[TResult]) -> TResult:
"""
Adds steps to the executor. Needed so that it can be overwritten in
DisabledWorkflowContext.
"""
return self._executor.add_step(step)
14 changes: 14 additions & 0 deletions upstash_workflow/error.py
Original file line number Diff line number Diff line change
@@ -4,12 +4,20 @@


class WorkflowError(QStashError):
"""
Error raised during Workflow execution
"""

def __init__(self, message: str) -> None:
super().__init__(message)
self.name = "WorkflowError"


class WorkflowAbort(Exception):
"""
Raised when the workflow executes a function successfully and aborts to end the execution
"""

def __init__(
self,
step_name: str,
@@ -32,6 +40,12 @@ def __init__(


def format_workflow_error(error: object) -> Dict[str, str]:
"""
Formats an unknown error to match the FailureFunctionPayload format
:param error:
:return:
"""
if isinstance(error, Exception):
return {"error": error.__class__.__name__, "message": str(error)}
return {"error": "Error", "message": "An error occurred while executing workflow."}
15 changes: 15 additions & 0 deletions upstash_workflow/fastapi.py
Original file line number Diff line number Diff line change
@@ -37,6 +37,21 @@ def post(
[Union[RouteFunction[TInitialPayload], AsyncRouteFunction[TInitialPayload]]],
Union[RouteFunction[TInitialPayload], AsyncRouteFunction[TInitialPayload]],
]:
"""
Decorator to serve a Upstash Workflow in a FastAPI project.
:param route_function: A function that uses WorkflowContext as a parameter and runs a workflow.
:param qstash_client: QStash client
:param on_step_finish: Function called to return a response after each step execution
:param initial_payload_parser: Function to parse the initial payload passed by the user
:param receiver: Receiver to verify *all* requests by checking if they come from QStash. By default, a receiver is created from the env variables QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY if they are set.
:param base_url: Base Url of the workflow endpoint. Can be used to set if there is a local tunnel or a proxy between QStash and the workflow endpoint. Will be set to the env variable UPSTASH_WORKFLOW_URL if not passed. If the env variable is not set, the url will be infered as usual from the `request.url` or the `url` parameter in `serve` options.
:param env: Optionally, one can pass an env object mapping environment variables to their keys. Useful in cases like cloudflare with hono.
:param retries: Number of retries to use in workflow requests, 3 by default
:param url: Url of the endpoint where the workflow is set up. If not set, url will be inferred from the request.
:return:
"""

def decorator(
route_function: Union[
RouteFunction[TInitialPayload], AsyncRouteFunction[TInitialPayload]
42 changes: 42 additions & 0 deletions upstash_workflow/serve/authorization.py
Original file line number Diff line number Diff line change
@@ -11,12 +11,43 @@
class DisabledWorkflowContext(
Generic[TInitialPayload], WorkflowContext[TInitialPayload]
):
"""
Workflow context which throws `WorkflowAbort` before running the steps.
Used for making a dry run before running any steps to check authentication.
Consider an endpoint like this:
```python
@serve.post("/auth")
async def auth(context: WorkflowContext[str]) -> None:
if context.headers.get("authentication" != "Bearer secret_password"):
print("Authentication failed.")
return
# ...
```
the `serve` method will first call the route_function with a `DisabledWorkflowContext`.
Here is the action we take in different cases:
- "step-found": we will run the workflow related sections of `serve`.
- "run-ended": simply return success and end the workflow
- error: returns 500.
"""

__disabled_message = "disabled-qstash-worklfow-run"

def _add_step(self, _step: BaseLazyStep[TResult]) -> TResult:
"""
Overwrite the `WorkflowContext._add_step` method to always raise `WorkflowAbort`
error in order to stop the execution whenever we encounter a step.
:param _step:
"""
raise WorkflowAbort(self.__disabled_message)

def cancel(self) -> None:
"""
overwrite cancel method to do nothing
"""
return

@classmethod
@@ -25,6 +56,17 @@ def try_authentication(
route_function: Callable[[WorkflowContext[TInitialPayload]], None],
context: WorkflowContext[TInitialPayload],
) -> Literal["run-ended", "step-found"]:
"""
copies the passed context to create a DisabledWorkflowContext. Then, runs the
route function with the new context.
- returns "run-ended" if there are no steps found or
if the auth failed and user called `return`
- returns "step-found" if DisabledWorkflowContext._add_step is called.
- if there is another error, returns the error.
:param route_function:
"""
disabled_context = DisabledWorkflowContext(
qstash_client=QStash(base_url="disabled-client", token="disabled-client"),
workflow_run_id=context.workflow_run_id,
13 changes: 13 additions & 0 deletions upstash_workflow/serve/options.py
Original file line number Diff line number Diff line change
@@ -28,6 +28,19 @@ def process_options(
retries: Optional[int] = DEFAULT_RETRIES,
url: Optional[str] = None,
) -> WorkflowServeOptions[TInitialPayload, TResponse]:
"""
Fills the options with default values if they are not provided.
Default values for:
- qstash_client: QStash client created with QSTASH_TOKEN env var
- on_step_finish: returns a Response with workflowRunId in the body (status: 200)
- initial_payload_parser: calls json.loads if initial request body exists.
- receiver: a Receiver if the required env vars are set
- base_url: env variable UPSTASH_WORKFLOW_URL
- env: os.environ
- retries: DEFAULT_RETRIES
- url: None
"""
environment = env if env is not None else dict(os.environ)

receiver_environment_variables_set = bool(
24 changes: 24 additions & 0 deletions upstash_workflow/serve/serve.py
Original file line number Diff line number Diff line change
@@ -41,6 +41,21 @@ def serve(
retries: Optional[int] = None,
url: Optional[str] = None,
) -> Dict[str, Callable[[TRequest], TResponse]]:
"""
Creates a method that handles incoming requests and runs the provided
route function as a workflow.
:param route_function: A function that uses WorkflowContext as a parameter and runs a workflow.
:param qstash_client: QStash client
:param on_step_finish: Function called to return a response after each step execution
:param initial_payload_parser: Function to parse the initial payload passed by the user
:param receiver: Receiver to verify *all* requests by checking if they come from QStash. By default, a receiver is created from the env variables QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY if they are set.
:param base_url: Base Url of the workflow endpoint. Can be used to set if there is a local tunnel or a proxy between QStash and the workflow endpoint. Will be set to the env variable UPSTASH_WORKFLOW_URL if not passed. If the env variable is not set, the url will be infered as usual from the `request.url` or the `url` parameter in `serve` options.
:param env: Optionally, one can pass an env object mapping environment variables to their keys. Useful in cases like cloudflare with hono.
:param retries: Number of retries to use in workflow requests, 3 by default
:param url: Url of the endpoint where the workflow is set up. If not set, url will be inferred from the request.
:return: An method that consumes incoming requests and runs the workflow.
"""
processed_options = process_options(
qstash_client=qstash_client,
on_step_finish=on_step_finish,
@@ -61,6 +76,15 @@ def serve(
url = processed_options.url

def _handler(request: TRequest) -> TResponse:
"""
Handles the incoming request, triggering the appropriate workflow steps.
Calls `trigger_first_invocation()` if it's the first invocation.
Otherwise, starts calling `trigger_route_function()` to execute steps in the workflow.
Finally, calls `trigger_workflow_delete()` to remove the workflow from QStash.
:param request: The incoming request to handle.
:return: A response.
"""
workflow_url = determine_urls(cast(Request, request), url, base_url)

request_payload = get_payload(request) or ""
Loading

0 comments on commit 4300219

Please sign in to comment.