Testing with Channels and Websockets #3602
Replies: 1 comment 3 replies
-
Hey, the code below should work. So what's different: I tried to figure out where your code was blocking - and it blocked when opening the websocket connection. This happens because it gets stuck waiting for However, we need to use "something" blocking to keep the websocket connection open. I decided to use Hope that helps :) from litestar import Litestar, websocket, WebSocket
from litestar.testing import create_test_client
from litestar.channels.backends.memory import MemoryChannelsBackend
from litestar.channels import ChannelsPlugin
import logging
def test_websocket():
@websocket("/ws")
async def ws(socket: WebSocket, channels: ChannelsPlugin) -> None:
await socket.accept()
async def _sender(message: str):
await socket.send_text(message)
async with channels.start_subscription(["my_new_channel"]) as subscriber:
async with subscriber.run_in_background(_sender):
await socket.receive_text()
channels = ChannelsPlugin(
backend=MemoryChannelsBackend(history=0), channels=["my_new_channel"], arbitrary_channels_allowed=True
)
with create_test_client(
route_handlers=[ws],
plugins=[channels]
) as client:
conn = client.websocket_connect("/ws")
with conn:
channels.publish("asdf", "my_new_channel")
message = conn.receive_text(timeout=1)
logging.info(message) |
Beta Was this translation helpful? Give feedback.
-
Hi,
I currently have an issue with testing a channels and websocket setup very similar to the docs. I create a TestClient and open a websocket_connection, but when sending a message via channels the receive_text() either blocks and prevents delivery of the message via channels or is skipped when I set block to False. I tried with a while True: loop but that did not work.
A second issue is that I cannot close the connection so the test runs forever. Is there a way to test this?
Beta Was this translation helpful? Give feedback.
All reactions