Skip to content

Commit

Permalink
Test DLQ (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
MeshanKhosla authored Jan 11, 2024
1 parent 33aa28d commit 0e2c55f
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 12 deletions.
42 changes: 42 additions & 0 deletions tests/asyncio/test_dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
import asyncio
from upstash_qstash.asyncio import Client
from qstash_token import QSTASH_TOKEN


@pytest.fixture
def client():
return Client(QSTASH_TOKEN)


@pytest.mark.asyncio
async def test_dlq(client):
print("Publishing to a failed endpoint")
pub_res = await client.publish_json({"url": "http://httpstat.us/404", "retries": 0})

msg_id = pub_res["messageId"]
assert msg_id is not None

print("Waiting 3 seconds for event to be delivered")
await asyncio.sleep(3)

print("Checking if message is in DLQ")
dlq = client.dlq()
all_messages_res = await dlq.list_messages()
all_messages = all_messages_res["messages"]

msg_sent = list(filter(lambda msg: msg["messageId"] == msg_id, all_messages))
assert len(msg_sent) == 1

dlq_id = msg_sent[0]["dlqId"]

print("Deleting message from DLQ")
await dlq.delete(dlq_id)

print("Checking if message is deleted from DLQ")
all_messages_after_delete_res = await dlq.list_messages()
all_messages_after_delete = all_messages_after_delete_res["messages"]
msg_deleted = list(
filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete)
)
assert len(msg_deleted) == 0
12 changes: 6 additions & 6 deletions tests/asyncio/test_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ async def test_topic_lifecycle(client):
{
"name": topic_name,
"endpoints": [
{"url": "https://example.com/endpoint1"},
{"url": "https://example.com/endpoint2"},
{"url": "https://qstash-endpoint1.requestcatcher.com"},
{"url": "https://qstash-endpoint2.requestcatcher.com"},
],
}
)
Expand All @@ -36,11 +36,11 @@ async def test_topic_lifecycle(client):
get_res = await topics.get(topic_name)
assert get_res["name"] == topic_name
assert any(
endpoint["url"] == "https://example.com/endpoint1"
endpoint["url"] == "https://qstash-endpoint1.requestcatcher.com"
for endpoint in get_res["endpoints"]
)
assert any(
endpoint["url"] == "https://example.com/endpoint2"
endpoint["url"] == "https://qstash-endpoint2.requestcatcher.com"
for endpoint in get_res["endpoints"]
)

Expand All @@ -61,7 +61,7 @@ async def test_topic_lifecycle(client):
{
"name": topic_name,
"endpoints": [
{"url": "https://example.com/endpoint1"},
{"url": "https://qstash-endpoint1.requestcatcher.com"},
],
}
)
Expand All @@ -72,7 +72,7 @@ async def test_topic_lifecycle(client):
print(f"Checking if endpoint1 have been removed from '${topic_name}'")
get_res = await topics.get(topic_name)
assert not any(
endpoint["url"] == "https://example.com/endpoint1"
endpoint["url"] == "https://qstash-endpoint1.requestcatcher.com"
for endpoint in get_res["endpoints"]
)

Expand Down
39 changes: 39 additions & 0 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest
import time
from upstash_qstash import Client
from qstash_token import QSTASH_TOKEN


@pytest.fixture
def client():
return Client(QSTASH_TOKEN)


def test_dlq(client):
print("Publishing to a failed endpoint")
pub_res = client.publish_json({"url": "http://httpstat.us/404", "retries": 0})

msg_id = pub_res["messageId"]
assert msg_id is not None

print("Waiting 3 seconds for event to be delivered")
time.sleep(3)

print("Checking if message is in DLQ")
dlq = client.dlq()
all_messages = dlq.list_messages()["messages"]

msg_sent = list(filter(lambda msg: msg["messageId"] == msg_id, all_messages))
assert len(msg_sent) == 1

dlq_id = msg_sent[0]["dlqId"]

print("Deleting message from DLQ")
dlq.delete(dlq_id)

print("Checking if message is deleted from DLQ")
all_messages_after_delete = dlq.list_messages()["messages"]
msg_deleted = list(
filter(lambda msg: msg["messageId"] == msg_id, all_messages_after_delete)
)
assert len(msg_deleted) == 0
12 changes: 6 additions & 6 deletions tests/test_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def test_topic_lifecycle(client):
{
"name": topic_name,
"endpoints": [
{"url": "https://example.com/endpoint1"},
{"url": "https://example.com/endpoint2"},
{"url": "https://qstash-endpoint1.requestcatcher.com"},
{"url": "https://qstash-endpoint2.requestcatcher.com"},
],
}
)
Expand All @@ -34,11 +34,11 @@ def test_topic_lifecycle(client):
get_res = topics.get(topic_name)
assert get_res["name"] == topic_name
assert any(
endpoint["url"] == "https://example.com/endpoint1"
endpoint["url"] == "https://qstash-endpoint1.requestcatcher.com"
for endpoint in get_res["endpoints"]
)
assert any(
endpoint["url"] == "https://example.com/endpoint2"
endpoint["url"] == "https://qstash-endpoint2.requestcatcher.com"
for endpoint in get_res["endpoints"]
)

Expand All @@ -60,7 +60,7 @@ def test_topic_lifecycle(client):
{
"name": topic_name,
"endpoints": [
{"url": "https://example.com/endpoint1"},
{"url": "https://qstash-endpoint1.requestcatcher.com"},
],
}
)
Expand All @@ -71,7 +71,7 @@ def test_topic_lifecycle(client):
print(f"Checking if endpoint1 have been removed from '${topic_name}'")
get_res = topics.get(topic_name)
assert not any(
endpoint["url"] == "https://example.com/endpoint1"
endpoint["url"] == "https://qstash-endpoint1.requestcatcher.com"
for endpoint in get_res["endpoints"]
)

Expand Down

0 comments on commit 0e2c55f

Please sign in to comment.