Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncHttpConsumer improvements #1255

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions channels/generic/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class AsyncHttpConsumer(AsyncConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.body = []
self.headers_sent = False
self.more_body = True

async def send_headers(self, *, status=200, headers=None):
"""
Expand All @@ -27,6 +29,8 @@ async def send_headers(self, *, status=200, headers=None):
elif isinstance(headers, dict):
headers = list(headers.items())

self.headers_sent = True

await self.send(
{"type": "http.response.start", "status": status, "headers": headers}
)
Expand All @@ -40,9 +44,12 @@ async def send_body(self, body, *, more_body=False):
the channel will be ignored.
"""
assert isinstance(body, bytes), "Body is not bytes"
await self.send(
{"type": "http.response.body", "body": body, "more_body": more_body}
)

if self.more_body:
self.more_body = more_body
await self.send(
{"type": "http.response.body", "body": body, "more_body": more_body}
)

async def send_response(self, status, body, **kwargs):
"""
Expand Down Expand Up @@ -70,6 +77,19 @@ async def disconnect(self):
"""
pass

async def close(self, body=b"", status=500, headers=None):
"""
Closes the HTTP response from the server end.
"""
if not self.more_body:
# HTTP Response is already closed, nothing to do.
return

if not self.headers_sent:
await self.send_headers(status=status, headers=headers)

await self.send_body(body)

async def http_request(self, message):
"""
Async entrypoint - concatenates body fragments and hands off control
Expand All @@ -80,10 +100,15 @@ async def http_request(self, message):
if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
finally:
await self.disconnect()
raise StopConsumer()
except StopConsumer:
await self.close(status=200)
except:
# TODO This is just a patch, after bubbling up the exception no body is calling http_disconnect.
await self.close(body=b"Internal Server Error", status=500)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wander if it would be nicer to have a exception_handler method on the AsyncHttpConsumer that sends this 500 down the wire.

In some cases we need to format this 500 response a little differently (eg, if we got a Redis connection exception we might well want to push some retry after headers on the response)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... if you want to format it differently you can always catch the exception in your class and handle it in your code.
Honestly I'm not familiar enough with the code base, I think when any exception bubbles up the http connection should be closed and the consumer stopped, but that doesn't happen for some reason. So this is the fix that I found.

Copy link

@adamhooper adamhooper Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw ... be sure to re-raise asyncio.CancelledError. If you don't re-raise it, asyncio produces undefined behavior.

For that matter ... shouldn't every exception be re-raised? I'm a keen user of asyncio.EventLoop.set_exception_handler() -- it helps find bugs on production. (I imagine this is why finally was used in the first place.)

(To be clear: I don't know much about Channels' innards. I'm just worried because changing catch-all behavior seems like it'll have far-reaching effects....)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree un-expected exceptions should not be swallowed so catching in self.handle and sending a response would not be ideal.

the reason for wanted to be able to manage the 500 response body is that for our server we want to return a JSON error bodys that is parsable. At the moment our solution for this is to put a middleware layer between the Consume and the top level Application that proxies all send messages however since this does not have access to the source exception it can't produce more than a very generic error.

@adamhooper The issue with the finally approach currently in master is that it replaces effectively swallows your exception, since finally raises StopConsumer any exception that was raised is then dropped by the AsyncConsume parent classes __call__ method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carltongibson what do we think about:set_exception_handler or CancelledError being used here? That/and/or having us create an exception_handler method to keep the flow more organized?

Are any of the pieces in this discussion required vs optional, or is the flow a bit beyond straightforward rule-logic?

Copy link
Member

@carltongibson carltongibson Mar 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought on this, when I looked at it, was that we're entering a loosing battle. (I catch an exception, try and call close, and end-up awaiting body — but hang-on! I was closing.)

Rather, for this kind of something I really wasn't expecting went wrong case, I think we should just let the exception bubble up to Daphne, where it will be handled with a plain response (and perhaps we can add some logging to help the related issues around this space...).

Then, assuming that all looks good, we need to make sure Daphne sends the appropriate http_disconnect to stop the consumer. (Which looks like it's not happening.)

(Of course, this isn't fresh in my mind so I may need to fire up the debugger again to get back on top of it.)

raise

# TODO This should be called by daphne whenever any exception bubbles up (As it does with websockets)
# IMHO this should be parallel to websocket_disconnect, removing the consumer from potential channel_layer groups.
async def http_disconnect(self, message):
"""
Let the user do their cleanup and close the consumer.
Expand Down
35 changes: 34 additions & 1 deletion channels/testing/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, application, method, path, body=b"", headers=None):

async def get_response(self, timeout=1):
"""
Get the application's response. Returns a dict with keys of
Get the application's full response. Returns a dict with keys of
"body", "headers" and "status".
"""
# If we've not sent the request yet, do so
Expand All @@ -54,3 +54,36 @@ async def get_response(self, timeout=1):
del response_start["type"]
response_start.setdefault("headers", [])
return response_start

async def send_request(self):
"""
Sends the request to the application without then waiting for
headers or any response.
"""
if not self.sent_request:
self.sent_request = True
await self.send_input({"type": "http.request", "body": self.body})

async def get_response_start(self, timeout=1):
"""
Gets the start of the response (its headers and status code)
"""
response_start = await self.receive_output(timeout)
assert response_start["type"] == "http.response.start"

# Return structured info
del response_start["type"]
response_start.setdefault("headers", [])
return response_start

async def get_body_chunk(self, timeout=1):
"""
Gets one chunk of body.
"""
chunk = await self.receive_output(timeout)
assert chunk["type"] == "http.response.body"
assert isinstance(chunk["body"], bytes)
if not chunk.get("more_body", False):
await self.wait(timeout)

return chunk["body"]
63 changes: 63 additions & 0 deletions tests/test_generic_http.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json

import pytest
from django.test import override_settings

from channels.generic.http import AsyncHttpConsumer
from channels.layers import get_channel_layer
from channels.testing import HttpCommunicator


Expand Down Expand Up @@ -32,3 +34,64 @@ async def handle(self, body):
assert response["body"] == b'{"value": 42}'
assert response["status"] == 200
assert response["headers"] == [(b"Content-Type", b"application/json")]


@pytest.mark.asyncio
async def test_async_http_consumer_with_channel_layer():
"""
Tests that AsyncHttpConsumer is implemented correctly.
"""

class TestConsumer(AsyncHttpConsumer):
"""
Abstract consumer that provides a method that handles running a command and getting a response on a
device.
"""

channel_layer_alias = "testlayer"

async def handle(self, body):
# Add consumer to a known test group that we will use to send events to.
await self.channel_layer.group_add("test_group", self.channel_name)
await self.send_headers(
status=200, headers=[(b"Content-Type", b"application/json")]
)

async def send_to_long_poll(self, event):
received_data = str(event["data"]).encode("utf8")
# We just echo what we receive, and close the response.
await self.send_body(received_data, more_body=False)

channel_layers_setting = {
"testlayer": {"BACKEND": "channels.layers.InMemoryChannelLayer"}
}

with override_settings(CHANNEL_LAYERS=channel_layers_setting):
# Open a connection
communicator = HttpCommunicator(
TestConsumer,
method="POST",
path="/test/",
body=json.dumps({"value": 42, "anything": False}).encode("utf-8"),
)

# We issue the HTTP request
await communicator.send_request()

# Gets the response start (status and headers)
response_start = await communicator.get_response_start(timeout=1)

# Make sure that the start of the response looks good so far.
assert response_start["status"] == 200
assert response_start["headers"] == [(b"Content-Type", b"application/json")]

# Send now a message to the consumer through the channel layer. Using the known test_group.
channel_layer = get_channel_layer("testlayer")
await channel_layer.group_send(
"test_group",
{"type": "send.to.long.poll", "data": "hello from channel layer"},
)

# Now we should be able to get the message back on the remaining chunk of body.
body = await communicator.get_body_chunk(timeout=1)
assert body == b"hello from channel layer"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too do/should we check for connection close?

43 changes: 43 additions & 0 deletions tests/test_http_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio

import pytest

from channels.generic.http import AsyncHttpConsumer
from channels.testing import HttpCommunicator


@pytest.mark.asyncio
async def test_async_http_consumer():
"""
Tests that AsyncHttpConsumer is implemented correctly.
"""

class TestConsumer(AsyncHttpConsumer):
async def handle(self, body):
self.is_streaming = True
await self.send_headers(
headers=[
(b"Cache-Control", b"no-cache"),
(b"Content-Type", b"text/event-stream"),
(b"Transfer-Encoding", b"chunked"),
]
)
asyncio.get_event_loop().create_task(self.stream())

async def stream(self):
for n in range(0, 3):
if not self.is_streaming:
break
payload = "data: %d\n\n" % (n + 1)
await self.send_body(payload.encode("utf-8"), more_body=True)
await asyncio.sleep(0.2)
await self.send_body(b"")

async def disconnect(self):
self.is_streaming = False

# Open a connection
communicator = HttpCommunicator(TestConsumer, method="GET", path="/test/", body=b"")
response = await communicator.get_response()
assert response["body"] == b"data: 1\n\ndata: 2\n\ndata: 3\n\n"
assert response["status"] == 200
Copy link
Contributor

@jheld jheld Mar 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to test that the connection has closed? and if so, is there any use in doing that? I'm requesting this because there is logic change here that is around the disconnect flow, so is it worthwhile to check that we really did disconnect?

I realize this is probably for SSE (server sent events), but just want to make sure that we aren't forgetting to check for the close, if we can/should.