Skip to content

Commit

Permalink
Merge pull request #19 from upstash/new-features
Browse files Browse the repository at this point in the history
Add support for new QStash features
  • Loading branch information
CahidArda authored Jun 13, 2024
2 parents 4bfda9b + 01fd1b5 commit a40041f
Show file tree
Hide file tree
Showing 13 changed files with 724 additions and 129 deletions.
52 changes: 40 additions & 12 deletions tests/asyncio/test_message.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Callable

import pytest

from tests import assert_eventually_async
Expand Down Expand Up @@ -191,9 +193,15 @@ async def test_batch_api_llm_async(async_qstash: AsyncQStash) -> None:


@pytest.mark.asyncio
async def test_enqueue_async(async_qstash: AsyncQStash) -> None:
async def test_enqueue_async(
async_qstash: AsyncQStash,
cleanup_queue_async: Callable[[AsyncQStash, str], None],
) -> None:
name = "test_queue"
cleanup_queue_async(async_qstash, name)

res = await async_qstash.message.enqueue(
queue="test_queue",
queue=name,
body="test-body",
url="https://example.com",
headers={
Expand All @@ -205,13 +213,17 @@ async def test_enqueue_async(async_qstash: AsyncQStash) -> None:

assert len(res.message_id) > 0

await async_qstash.queue.delete("test_queue")


@pytest.mark.asyncio
async def test_enqueue_json_async(async_qstash: AsyncQStash) -> None:
async def test_enqueue_json_async(
async_qstash: AsyncQStash,
cleanup_queue_async: Callable[[AsyncQStash, str], None],
) -> None:
name = "test_queue"
cleanup_queue_async(async_qstash, name)

res = await async_qstash.message.enqueue_json(
queue="test_queue",
queue=name,
body={"test": "body"},
url="https://example.com",
headers={
Expand All @@ -223,13 +235,17 @@ async def test_enqueue_json_async(async_qstash: AsyncQStash) -> None:

assert len(res.message_id) > 0

await async_qstash.queue.delete("test_queue")


@pytest.mark.asyncio
async def test_enqueue_api_llm_async(async_qstash: AsyncQStash) -> None:
async def test_enqueue_api_llm_async(
async_qstash: AsyncQStash,
cleanup_queue_async: Callable[[AsyncQStash, str], None],
) -> None:
name = "test_queue"
cleanup_queue_async(async_qstash, name)

res = await async_qstash.message.enqueue_json(
queue="test_queue",
queue=name,
body={
"model": "meta-llama/Meta-Llama-3-8B-Instruct",
"messages": [
Expand All @@ -247,8 +263,6 @@ async def test_enqueue_api_llm_async(async_qstash: AsyncQStash) -> None:

assert len(res.message_id) > 0

await async_qstash.queue.delete("test_queue")


@pytest.mark.asyncio
async def test_publish_to_url_group_async(async_qstash: AsyncQStash) -> None:
Expand All @@ -273,3 +287,17 @@ async def test_publish_to_url_group_async(async_qstash: AsyncQStash) -> None:

await assert_delivered_eventually_async(async_qstash, res[0].message_id)
await assert_delivered_eventually_async(async_qstash, res[1].message_id)


@pytest.mark.asyncio
async def test_timeout_async(async_qstash: AsyncQStash) -> None:
res = await async_qstash.message.publish_json(
body={"ex_key": "ex_value"},
url="https://example.com",
timeout=90,
)

assert isinstance(res, PublishResponse)
assert len(res.message_id) > 0

await assert_delivered_eventually_async(async_qstash, res.message_id)
63 changes: 53 additions & 10 deletions tests/asyncio/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,68 @@
from typing import Callable

import pytest

from upstash_qstash import AsyncQStash


@pytest.mark.asyncio
async def test_queue_async(async_qstash: AsyncQStash) -> None:
await async_qstash.queue.upsert(queue="test_queue", parallelism=1)
async def test_queue_async(
async_qstash: AsyncQStash,
cleanup_queue_async: Callable[[AsyncQStash, str], None],
) -> None:
name = "test_queue"
cleanup_queue_async(async_qstash, name)

queue = await async_qstash.queue.get("test_queue")
assert queue.name == "test_queue"
await async_qstash.queue.upsert(queue=name, parallelism=1)

queue = await async_qstash.queue.get(name)
assert queue.name == name
assert queue.parallelism == 1

await async_qstash.queue.upsert(queue="test_queue", parallelism=2)
await async_qstash.queue.upsert(queue=name, parallelism=2)

queue = await async_qstash.queue.get("test_queue")
assert queue.name == "test_queue"
queue = await async_qstash.queue.get(name)
assert queue.name == name
assert queue.parallelism == 2

all_queues = await async_qstash.queue.list()
assert any(True for q in all_queues if q.name == "test_queue")
assert any(True for q in all_queues if q.name == name)

await async_qstash.queue.delete("test_queue")
await async_qstash.queue.delete(name)

all_queues = await async_qstash.queue.list()
assert not any(True for q in all_queues if q.name == "test_queue")
assert not any(True for q in all_queues if q.name == name)


@pytest.mark.asyncio
async def test_queue_pause_resume_async(
async_qstash: AsyncQStash,
cleanup_queue_async: Callable[[AsyncQStash, str], None],
) -> None:
name = "test_queue"
cleanup_queue_async(async_qstash, name)

await async_qstash.queue.upsert(queue=name)

queue = await async_qstash.queue.get(name)
assert queue.paused is False

await async_qstash.queue.pause(name)

queue = await async_qstash.queue.get(name)
assert queue.paused is True

await async_qstash.queue.resume(name)

queue = await async_qstash.queue.get(name)
assert queue.paused is False

await async_qstash.queue.upsert(name, paused=True)

queue = await async_qstash.queue.get(name)
assert queue.paused is True

await async_qstash.queue.upsert(name, paused=False)

queue = await async_qstash.queue.get(name)
assert queue.paused is False
59 changes: 49 additions & 10 deletions tests/asyncio/test_schedules.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,64 @@
from typing import Callable

import pytest

from upstash_qstash import AsyncQStash


@pytest.mark.asyncio
async def test_schedule_lifecycle_async(async_qstash: AsyncQStash) -> None:
sched_id = await async_qstash.schedule.create_json(
cron="* * * * *",
async def test_schedule_lifecycle_async(
async_qstash: AsyncQStash,
cleanup_schedule_async: Callable[[AsyncQStash, str], None],
) -> None:
schedule_id = await async_qstash.schedule.create_json(
cron="1 1 1 1 1",
destination="https://example.com",
body={"ex_key": "ex_value"},
)

assert len(sched_id) > 0
cleanup_schedule_async(async_qstash, schedule_id)

res = await async_qstash.schedule.get(sched_id)
assert res.schedule_id == sched_id
assert res.cron == "* * * * *"
assert len(schedule_id) > 0

res = await async_qstash.schedule.get(schedule_id)
assert res.schedule_id == schedule_id
assert res.cron == "1 1 1 1 1"

list_res = await async_qstash.schedule.list()
assert any(s.schedule_id == sched_id for s in list_res)
assert any(s.schedule_id == schedule_id for s in list_res)

await async_qstash.schedule.delete(sched_id)
await async_qstash.schedule.delete(schedule_id)

list_res = await async_qstash.schedule.list()
assert not any(s.schedule_id == sched_id for s in list_res)
assert not any(s.schedule_id == schedule_id for s in list_res)


@pytest.mark.asyncio
async def test_schedule_pause_resume_async(
async_qstash: AsyncQStash,
cleanup_schedule_async: Callable[[AsyncQStash, str], None],
) -> None:
schedule_id = await async_qstash.schedule.create_json(
cron="1 1 1 1 1",
destination="https://example.com",
body={"ex_key": "ex_value"},
)

cleanup_schedule_async(async_qstash, schedule_id)

assert len(schedule_id) > 0

res = await async_qstash.schedule.get(schedule_id)
assert res.schedule_id == schedule_id
assert res.cron == "1 1 1 1 1"
assert res.paused is False

await async_qstash.schedule.pause(schedule_id)

res = await async_qstash.schedule.get(schedule_id)
assert res.paused is True

await async_qstash.schedule.resume(schedule_id)

res = await async_qstash.schedule.get(schedule_id)
assert res.paused is False
89 changes: 89 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio
from typing import Callable

import pytest
import pytest_asyncio

Expand All @@ -13,3 +16,89 @@ def qstash():
@pytest_asyncio.fixture
async def async_qstash():
return AsyncQStash(token=QSTASH_TOKEN)


@pytest.fixture
def cleanup_queue(request: pytest.FixtureRequest) -> Callable[[QStash, str], None]:
queue_names = []

def register(qstash: QStash, queue_name: str) -> None:
queue_names.append((qstash, queue_name))

def delete():
for qstash, queue_name in queue_names:
try:
qstash.queue.delete(queue_name)
except Exception:
pass

request.addfinalizer(delete)

return register


@pytest_asyncio.fixture
def cleanup_queue_async(
request: pytest.FixtureRequest,
) -> Callable[[AsyncQStash, str], None]:
queue_names = []

def register(async_qstash: AsyncQStash, queue_name: str) -> None:
queue_names.append((async_qstash, queue_name))

def finalizer():
async def delete():
for async_qstash, queue_name in queue_names:
try:
await async_qstash.queue.delete(queue_name)
except Exception:
pass

asyncio.run(delete())

request.addfinalizer(finalizer)

return register


@pytest.fixture
def cleanup_schedule(request: pytest.FixtureRequest) -> Callable[[QStash, str], None]:
schedule_ids = []

def register(qstash: QStash, schedule_id: str) -> None:
schedule_ids.append((qstash, schedule_id))

def delete():
for qstash, schedule_id in schedule_ids:
try:
qstash.schedule.delete(schedule_id)
except Exception:
pass

request.addfinalizer(delete)

return register


@pytest_asyncio.fixture
def cleanup_schedule_async(
request: pytest.FixtureRequest,
) -> Callable[[AsyncQStash, str], None]:
schedule_ids = []

def register(async_qstash: AsyncQStash, schedule_id: str) -> None:
schedule_ids.append((async_qstash, schedule_id))

def finalizer():
async def delete():
for async_qstash, schedule_id in schedule_ids:
try:
await async_qstash.schedule.delete(schedule_id)
except Exception:
pass

asyncio.run(delete())

request.addfinalizer(finalizer)

return register
Loading

0 comments on commit a40041f

Please sign in to comment.