Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-1555: pydoc #11

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions upstash_workflow/asyncio/context/auto_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ async def add_step(self, step_info: BaseLazyStep[TResult]) -> TResult:
return cast(TResult, await self.run_single(step_info))

async def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
"""
Executes a step:
- If the step result is available in the steps, returns the result
- If the result is not avaiable, runs the function
- Sends the result to QStash

:param lazy_step: lazy step to execute
:return: step result
"""
if self.step_count < self.non_plan_step_count:
step = self.steps[self.step_count + self.plan_step_count]
validate_step(lazy_step, step)
Expand All @@ -46,6 +55,11 @@ async def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
async def submit_steps_to_qstash(
self, steps: List[DefaultStep], lazy_steps: List[BaseLazyStep[Any]]
) -> None:
"""
sends the steps to QStash as batch

:param steps: steps to send
"""
if not steps:
raise WorkflowError(
f"Unable to submit steps to QStash. Provided list is empty. Current step: {self.step_count}"
Expand Down Expand Up @@ -110,6 +124,16 @@ async def submit_steps_to_qstash(


def validate_step(lazy_step: BaseLazyStep[Any], step_from_request: DefaultStep) -> None:
"""
Given a BaseLazyStep which is created during execution and a Step parsed
from the incoming request; compare the step names and types to make sure
that they are the same.

Raises `WorkflowError` if there is a difference.

:param lazy_step: lazy step created during execution
:param step_from_request: step parsed from incoming request
"""
if lazy_step.step_name != step_from_request.step_name:
raise WorkflowError(
f"Incompatible step name. Expected '{lazy_step.step_name}', "
Expand Down
10 changes: 10 additions & 0 deletions upstash_workflow/asyncio/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@


class WorkflowContext(Generic[TInitialPayload]):
"""
Upstash Workflow context

See the docs for fields and methods https://upstash.com/docs/workflow/basics/context
"""

def __init__(
self,
qstash_client: AsyncQStash,
Expand Down Expand Up @@ -164,4 +170,8 @@ async def call(
return cast(CallResponse[Any], result)

async def _add_step(self, step: BaseLazyStep[TResult]) -> TResult:
"""
Adds steps to the executor. Needed so that it can be overwritten in
DisabledWorkflowContext.
"""
return await self._executor.add_step(step)
6 changes: 6 additions & 0 deletions upstash_workflow/asyncio/workflow_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@


async def get_payload(request: Request) -> Optional[str]:
"""
Gets the request body. If that fails, returns None

:param request: request received in the workflow api
:return: request body
"""
try:
return json.dumps(await request.json())
except Exception:
Expand Down
22 changes: 22 additions & 0 deletions upstash_workflow/asyncio/workflow_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ async def handle_third_party_call_result(
workflow_url: str,
retries: int,
) -> Literal["call-will-retry", "is-call-return", "continue-workflow"]:
"""
Check if the request is from a third party call result. If so,
call QStash to add the result to the ongoing workflow.

Otherwise, do nothing.

### How third party calls work

In third party calls, we publish a message to the third party API.
the result is then returned back to the workflow endpoint.

Whenever the workflow endpoint receives a request, we first check
if the incoming request is a third party call result coming from QStash.
If so, we send back the result to QStash as a result step.

:param request: Incoming request
:param request_payload: Request payload
:param client: QStash client
:param workflow_url: Workflow URL
:param retries: Number of retries
:return: "call-will-retry", "is-call-return" or "continue-workflow"
"""
try:
if request.headers and request.headers.get("Upstash-Workflow-Callback"):
if request_payload:
Expand Down
24 changes: 24 additions & 0 deletions upstash_workflow/context/auto_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ def add_step(self, step_info: BaseLazyStep[TResult]) -> TResult:
return cast(TResult, self.run_single(step_info))

def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
"""
Executes a step:
- If the step result is available in the steps, returns the result
- If the result is not avaiable, runs the function
- Sends the result to QStash

:param lazy_step: lazy step to execute
:return: step result
"""
if self.step_count < self.non_plan_step_count:
step = self.steps[self.step_count + self.plan_step_count]
validate_step(lazy_step, step)
Expand All @@ -46,6 +55,11 @@ def run_single(self, lazy_step: BaseLazyStep[TResult]) -> Any:
def submit_steps_to_qstash(
self, steps: List[DefaultStep], lazy_steps: List[BaseLazyStep[Any]]
) -> None:
"""
sends the steps to QStash as batch

:param steps: steps to send
"""
if not steps:
raise WorkflowError(
f"Unable to submit steps to QStash. Provided list is empty. Current step: {self.step_count}"
Expand Down Expand Up @@ -110,6 +124,16 @@ def submit_steps_to_qstash(


def validate_step(lazy_step: BaseLazyStep[Any], step_from_request: DefaultStep) -> None:
"""
Given a BaseLazyStep which is created during execution and a Step parsed
from the incoming request; compare the step names and types to make sure
that they are the same.

Raises `WorkflowError` if there is a difference.

:param lazy_step: lazy step created during execution
:param step_from_request: step parsed from incoming request
"""
if lazy_step.step_name != step_from_request.step_name:
raise WorkflowError(
f"Incompatible step name. Expected '{lazy_step.step_name}', "
Expand Down
10 changes: 10 additions & 0 deletions upstash_workflow/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@


class WorkflowContext(Generic[TInitialPayload]):
"""
Upstash Workflow context

See the docs for fields and methods https://upstash.com/docs/workflow/basics/context
"""

def __init__(
self,
qstash_client: QStash,
Expand Down Expand Up @@ -163,4 +169,8 @@ def call(
return cast(CallResponse[Any], result)

def _add_step(self, step: BaseLazyStep[TResult]) -> TResult:
"""
Adds steps to the executor. Needed so that it can be overwritten in
DisabledWorkflowContext.
"""
return self._executor.add_step(step)
14 changes: 14 additions & 0 deletions upstash_workflow/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@


class WorkflowError(QStashError):
"""
Error raised during Workflow execution
"""

def __init__(self, message: str) -> None:
super().__init__(message)
self.name = "WorkflowError"


class WorkflowAbort(Exception):
"""
Raised when the workflow executes a function successfully and aborts to end the execution
"""

def __init__(
self,
step_name: str,
Expand All @@ -32,6 +40,12 @@ def __init__(


def format_workflow_error(error: object) -> Dict[str, str]:
"""
Formats an unknown error to match the FailureFunctionPayload format

:param error:
:return:
"""
if isinstance(error, Exception):
return {"error": error.__class__.__name__, "message": str(error)}
return {"error": "Error", "message": "An error occurred while executing workflow."}
15 changes: 15 additions & 0 deletions upstash_workflow/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ def post(
[Union[RouteFunction[TInitialPayload], AsyncRouteFunction[TInitialPayload]]],
Union[RouteFunction[TInitialPayload], AsyncRouteFunction[TInitialPayload]],
]:
"""
Decorator to serve a Upstash Workflow in a FastAPI project.

:param route_function: A function that uses WorkflowContext as a parameter and runs a workflow.
:param qstash_client: QStash client
:param on_step_finish: Function called to return a response after each step execution
:param initial_payload_parser: Function to parse the initial payload passed by the user
:param receiver: Receiver to verify *all* requests by checking if they come from QStash. By default, a receiver is created from the env variables QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY if they are set.
:param base_url: Base Url of the workflow endpoint. Can be used to set if there is a local tunnel or a proxy between QStash and the workflow endpoint. Will be set to the env variable UPSTASH_WORKFLOW_URL if not passed. If the env variable is not set, the url will be infered as usual from the `request.url` or the `url` parameter in `serve` options.
:param env: Optionally, one can pass an env object mapping environment variables to their keys. Useful in cases like cloudflare with hono.
:param retries: Number of retries to use in workflow requests, 3 by default
:param url: Url of the endpoint where the workflow is set up. If not set, url will be inferred from the request.
:return:
"""

def decorator(
route_function: Union[
RouteFunction[TInitialPayload], AsyncRouteFunction[TInitialPayload]
Expand Down
42 changes: 42 additions & 0 deletions upstash_workflow/serve/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,43 @@
class DisabledWorkflowContext(
Generic[TInitialPayload], WorkflowContext[TInitialPayload]
):
"""
Workflow context which throws `WorkflowAbort` before running the steps.

Used for making a dry run before running any steps to check authentication.

Consider an endpoint like this:
```python
@serve.post("/auth")
async def auth(context: WorkflowContext[str]) -> None:
if context.headers.get("authentication" != "Bearer secret_password"):
print("Authentication failed.")
return
# ...
```

the `serve` method will first call the route_function with a `DisabledWorkflowContext`.
Here is the action we take in different cases:
- "step-found": we will run the workflow related sections of `serve`.
- "run-ended": simply return success and end the workflow
- error: returns 500.
"""

__disabled_message = "disabled-qstash-worklfow-run"

def _add_step(self, _step: BaseLazyStep[TResult]) -> TResult:
"""
Overwrite the `WorkflowContext._add_step` method to always raise `WorkflowAbort`
error in order to stop the execution whenever we encounter a step.

:param _step:
"""
raise WorkflowAbort(self.__disabled_message)

def cancel(self) -> None:
"""
overwrite cancel method to do nothing
"""
return

@classmethod
Expand All @@ -25,6 +56,17 @@ def try_authentication(
route_function: Callable[[WorkflowContext[TInitialPayload]], None],
context: WorkflowContext[TInitialPayload],
) -> Literal["run-ended", "step-found"]:
"""
copies the passed context to create a DisabledWorkflowContext. Then, runs the
route function with the new context.

- returns "run-ended" if there are no steps found or
if the auth failed and user called `return`
- returns "step-found" if DisabledWorkflowContext._add_step is called.
- if there is another error, returns the error.

:param route_function:
"""
disabled_context = DisabledWorkflowContext(
qstash_client=QStash(base_url="disabled-client", token="disabled-client"),
workflow_run_id=context.workflow_run_id,
Expand Down
13 changes: 13 additions & 0 deletions upstash_workflow/serve/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ def process_options(
retries: Optional[int] = DEFAULT_RETRIES,
url: Optional[str] = None,
) -> WorkflowServeOptions[TInitialPayload, TResponse]:
"""
Fills the options with default values if they are not provided.

Default values for:
- qstash_client: QStash client created with QSTASH_TOKEN env var
- on_step_finish: returns a Response with workflowRunId in the body (status: 200)
- initial_payload_parser: calls json.loads if initial request body exists.
- receiver: a Receiver if the required env vars are set
- base_url: env variable UPSTASH_WORKFLOW_URL
- env: os.environ
- retries: DEFAULT_RETRIES
- url: None
"""
environment = env if env is not None else dict(os.environ)

receiver_environment_variables_set = bool(
Expand Down
24 changes: 24 additions & 0 deletions upstash_workflow/serve/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ def serve(
retries: Optional[int] = None,
url: Optional[str] = None,
) -> Dict[str, Callable[[TRequest], TResponse]]:
"""
Creates a method that handles incoming requests and runs the provided
route function as a workflow.

:param route_function: A function that uses WorkflowContext as a parameter and runs a workflow.
:param qstash_client: QStash client
:param on_step_finish: Function called to return a response after each step execution
:param initial_payload_parser: Function to parse the initial payload passed by the user
:param receiver: Receiver to verify *all* requests by checking if they come from QStash. By default, a receiver is created from the env variables QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY if they are set.
:param base_url: Base Url of the workflow endpoint. Can be used to set if there is a local tunnel or a proxy between QStash and the workflow endpoint. Will be set to the env variable UPSTASH_WORKFLOW_URL if not passed. If the env variable is not set, the url will be infered as usual from the `request.url` or the `url` parameter in `serve` options.
:param env: Optionally, one can pass an env object mapping environment variables to their keys. Useful in cases like cloudflare with hono.
:param retries: Number of retries to use in workflow requests, 3 by default
:param url: Url of the endpoint where the workflow is set up. If not set, url will be inferred from the request.
:return: An method that consumes incoming requests and runs the workflow.
"""
processed_options = process_options(
qstash_client=qstash_client,
on_step_finish=on_step_finish,
Expand All @@ -61,6 +76,15 @@ def serve(
url = processed_options.url

def _handler(request: TRequest) -> TResponse:
"""
Handles the incoming request, triggering the appropriate workflow steps.
Calls `trigger_first_invocation()` if it's the first invocation.
Otherwise, starts calling `trigger_route_function()` to execute steps in the workflow.
Finally, calls `trigger_workflow_delete()` to remove the workflow from QStash.

:param request: The incoming request to handle.
:return: A response.
"""
workflow_url = determine_urls(cast(Request, request), url, base_url)

request_payload = get_payload(request) or ""
Expand Down
Loading
Loading