Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-1568: raise errors on qstash token missing #21

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "upstash-workflow"
version = "0.0.1-rc.1"
version = "0.0.1-rc.2"
description = "Python SDK for Upstash Workflow"
license = "MIT"
authors = ["Upstash <[email protected]>"]
Expand Down Expand Up @@ -29,8 +29,7 @@ packages = [{ include = "upstash_workflow" }]

[tool.poetry.dependencies]
python = "^3.8"
httpx = ">=0.23.0, <1"
qstash = "^2.0.2"
qstash = "^2.0.3"

[tool.poetry.group.fastapi.dependencies]
fastapi = "^0.115.0"
Expand Down
2 changes: 1 addition & 1 deletion upstash_workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.1-rc.1"
__version__ = "0.0.1-rc.2"

from upstash_workflow.context.context import WorkflowContext
from upstash_workflow.serve.serve import serve
Expand Down
2 changes: 1 addition & 1 deletion upstash_workflow/asyncio/serve/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload:
return ServeBaseOptions[TInitialPayload, TResponse](
qstash_client=qstash_client
or AsyncQStash(
cast(str, environment.get("QSTASH_TOKEN", "")),
cast(str, environment.get("QSTASH_TOKEN")),
),
on_step_finish=on_step_finish or _on_step_finish,
initial_payload_parser=initial_payload_parser or _initial_payload_parser,
Expand Down
13 changes: 5 additions & 8 deletions upstash_workflow/asyncio/workflow_requests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations
import httpx
import json
import base64
import logging
Expand Down Expand Up @@ -68,13 +67,11 @@ async def _trigger_workflow_delete(
workflow_context: AsyncWorkflowContext[TInitialPayload],
cancel: Optional[bool] = False,
) -> None:
async with httpx.AsyncClient() as client:
await client.delete(
f"https://qstash.upstash.io/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
headers={
"Authorization": f"Bearer {workflow_context.env.get('QSTASH_TOKEN', '')}"
},
)
await workflow_context.qstash_client.http.request(
path=f"/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
method="DELETE",
parse_response=False,
)


async def _handle_third_party_call_result(
Expand Down
10 changes: 10 additions & 0 deletions upstash_workflow/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from inspect import iscoroutinefunction
import json
import os
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from typing import Callable, Awaitable, cast, TypeVar, Optional, Dict
Expand Down Expand Up @@ -45,6 +46,15 @@ def post(
:return:
"""

if not (
qstash_client
or (env is not None and env.get("QSTASH_TOKEN"))
or (env is None and os.getenv("QSTASH_TOKEN"))
):
raise ValueError(
"QSTASH_TOKEN is missing. Make sure to set it in the environment variables or pass qstash_client or env as an argument."
)

def decorator(
route_function: AsyncRouteFunction[TInitialPayload],
) -> AsyncRouteFunction[TInitialPayload]:
Expand Down
10 changes: 10 additions & 0 deletions upstash_workflow/flask.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from inspect import iscoroutinefunction
import os
from flask import Flask, request
from werkzeug.wrappers import Response
from typing import Callable, cast, TypeVar, Optional, Dict
Expand Down Expand Up @@ -49,6 +50,15 @@ def route(
:return:
"""

if not (
qstash_client
or (env is not None and env.get("QSTASH_TOKEN"))
or (env is None and os.getenv("QSTASH_TOKEN"))
):
raise ValueError(
"QSTASH_TOKEN is missing. Make sure to set it in the environment variables or pass qstash_client or env as an argument."
)

def decorator(
route_function: RouteFunction[TInitialPayload],
) -> RouteFunction[TInitialPayload]:
Expand Down
2 changes: 1 addition & 1 deletion upstash_workflow/serve/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload:
return ServeBaseOptions[TInitialPayload, TResponse](
qstash_client=qstash_client
or QStash(
cast(str, environment.get("QSTASH_TOKEN", "")),
cast(str, environment.get("QSTASH_TOKEN")),
),
on_step_finish=on_step_finish or _on_step_finish,
initial_payload_parser=initial_payload_parser or _initial_payload_parser,
Expand Down
13 changes: 5 additions & 8 deletions upstash_workflow/workflow_requests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations
import httpx
import json
import base64
import logging
Expand Down Expand Up @@ -74,13 +73,11 @@ def _trigger_workflow_delete(
workflow_context: WorkflowContext[TInitialPayload],
cancel: Optional[bool] = False,
) -> None:
with httpx.Client() as client:
client.delete(
f"https://qstash.upstash.io/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
headers={
"Authorization": f"Bearer {workflow_context.env.get('QSTASH_TOKEN', '')}"
},
)
workflow_context.qstash_client.http.request(
path=f"/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
method="DELETE",
parse_response=False,
)


def _recreate_user_headers(headers: Dict[str, str]) -> Dict[str, str]:
Expand Down
Loading