-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Too many websocket client disconnect, memory not release #6325
Comments
Your scripts are not full, unable to reproduce. |
this is full script import asyncio
from websocket import create_connection
import requests
# pip install websocket
# pip install websocket-client
async def connect_ws(id):
url = f'ws://localhost:8000/land/websocket-tunnel'
ws = create_connection(url)
response = ws.recv()
print(str(id) + response)
await asyncio.sleep(5)
ws.close()
async def run():
for i in range(10000):
asyncio.ensure_future(connect_ws(i))
await asyncio.sleep(1000)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
import asyncio
from websocket import create_connection
import requests |
You forgot the server. Please save my time. |
I am so sorry, this is a simple full code server, you can start server by this python file, then run the script, you can watch your memory used, after scirpt finished.you can request to note: you may change the max client connect use import asyncio
import gc
import aiohttp
from aiohttp import web
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
try:
await ws.send_str('hi')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
break
await ws.send_str(msg)
except (Exception, asyncio.CancelledError) as e:
pass
finally:
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
return ws
async def gc_handler(request):
gc.collect()
return web.json_response({'status': 'ok'})
app = web.Application(client_max_size=1024 ** 10)
app.add_routes([web.get('/land/websocket-tunnel', websocket_handler),
web.get('/land/gc', gc_handler)])
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=8000) |
Do you reproduce this issue? and have some way to reslove or temporary solution,this is very serious in my server. hope your reply.
|
Any update on this ? I also have similar problem, my code is a bit huge. Need to tidy up before I can post it here app start at 100MB memory and grow to 200MB after 1 day, I need to restart every 2 weeks. It can grow more then 2GB
App is running using docker with python:3.10.13-slim, with only 2-3 ws client connected |
I can reproduce with the above code: It's necessary to set I'm struggling to make any sense of tracemalloc output, it's not clear what is happening currently. Maybe there's a better tool to use? I don't think I have time at the moment to look into it. |
@bdraco I suspect this may affect homeassistant, so might be worth digging into if you have time. |
I'll take a look this weekend if I can find some free cycles. I may have already fixed this problem in HA and need to upstream the fix. (Or it's a different leak) |
It does looks like something is leaking python objects
These seem to be growing over time |
Also it doesn't looks like the problem I was seeing in HA |
If I don't create the |
Creating |
This doesn't appear to leak either |
async def websocket_handler(request):
mem = process.memory_info().rss
print(mem)
ws = web.WebSocketResponse(heartbeat=5)
await ws.prepare(request)
await ws.send_str("hi")
print(process.memory_info().rss)
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY) It leaks as soon as the |
leaks with compress=False as well |
leak seems to be the transport. It goes away with diff --git a/aiohttp/http_websocket.py b/aiohttp/http_websocket.py
index 475b1f78..25907c62 100644
--- a/aiohttp/http_websocket.py
+++ b/aiohttp/http_websocket.py
@@ -729,3 +729,7 @@ class WebSocketWriter:
)
finally:
self._closing = True
+ self.transport.close()
+ self.transport = None
+ self.protocol = None
+ |
This isn't enough. without the |
I can reduce the leak with @@ -324,14 +329,17 @@ class RequestHandler(BaseProtocol):
if self._keepalive_handle is not None:
self._keepalive_handle.cancel()
+ self._keepalive_handle = None
if self._current_request is not None:
if exc is None:
exc = ConnectionResetError("Connection lost")
self._current_request._cancel(exc)
+ self._current_request = None
if self._waiter is not None:
self._waiter.cancel()
+ self._waiter = None
if handler_cancellation and self._task_handler is not None:
self._task_handler.cancel()
@@ -342,6 +350,14 @@ class RequestHandler(BaseProtocol):
self._payload_parser.feed_eof()
But I haven't been able to find what is holding the reference |
We do this in client_proto, so maybe it's just an oversight that it's not being done in the websockets? aiohttp/aiohttp/client_proto.py Lines 69 to 75 in 669109f
|
Or, as we have that protocol handy already, maybe we just need to do As for what's holding the reference, my suspicion is that until the transport is closed, it has either a callback method: So, there's probably a circular reference of some kind there, which is also referenced in the loop, until the close() method is called. Though, that also suggests that if the user doesn't call .close(), then we'll still have the same leak. It should get closed when the connection is lost... |
I dug for a bit more on this but didn't find it. Out of time for today but will dig more this weekend |
Closing the transport there only solves whatever is leaking in the send path. As soon as I add back the receive it look like it keeps holding the reference forever and never gets GCed We also can't close the transport there because it means anything in the buffer will not get sent before it's closed |
It looks like
|
But if I wait long though they do get cleaned up.. hmm |
{"status": "ok", "mem": 151601152, "WebSocketResponse": 0, "WebSocketWriter": 0, "Transport": 0, "TimerHandle": 2, "FlowControlDataQueue": 0, "CIMultiDict": 3, "deque": 4, "common": [["function", 7339], ["tuple", 4882], ["dict", 3309], ["ReferenceType", 1672], ["wrapper_descriptor", 1495], ["cell", 1144], ["builtin_function_or_method", 1086], ["method_descriptor", 1073], ["type", 1073], ["getset_descriptor", 1041], ["list", 862], ["property", 739], ["member_descriptor", 666], ["module", 316], ["ModuleSpec", 312], ["frozenset", 258], ["_tuplegetter", 250], ["SourceFileLoader", 237], ["set", 205], ["classmethod", 200], ["_GenericAlias", 189], ["staticmethod", 183], ["_abc_data", 163], ["_UnionGenericAlias", 156], ["ABCMeta", 150], ["Pattern", 93], ["_NamedIntConstant", 76], ["Field", 74], ["HTTPStatus", 62], ["classmethod_descriptor", 52], ["EnumType", 52], ["method", 49], ["ForwardRef", 45], ["IPv6Address", 43], ["reify", 38], ["_SpecialGenericAlias", 37], ["_TLSAlertType", 34], ["ExtensionFileLoader", 33], ["Signals", 31], ["Negsignal", 31], ["IPv4Address", 30], ["_DataclassParams", 29], ["_lru_cache_wrapper", 28], ["IPv6Network", 28], ["FileFinder", 27], ["AlertDescription", 27], ["_CallableGenericAlias", 24], ["cached_property", 24], ["_TLSMessageType", 22], ["TypeVar", 22], ["IPv4Network", 19], ["CType", 19], ["_Precedence", 18], ["SimpleNamespace", 17], ["BufferFlags", 17], ["Logger", 16], ["RegexFlag", 15], ["Options", 15], ["_SpecialForm", 14], ["cython_function_or_method", 14], ["builtin_method", 13], ["WSCloseCode", 13], ["scputimes", 13], ["Month", 12], ["Context", 12], ["AddressFamily", 10], ["MsgFlag", 10], ["AddressInfo", 10], ["_ProtocolMeta", 10], ["partial", 10], ["_Feature", 10], ["SSLErrorNumber", 9], ["GenericAlias", 9], ["HTTPMethod", 9], ["WSMsgType", 9], ["defaultdict", 8], ["_State", 8], ["SplitResult", 8], ["lock", 7], ["VerifyFlags", 7], ["TLSVersion", 7], ["Day", 7], ["Struct", 7], ["KeyMethod", 7], ["_SSLMethod", 6], ["_TLSContentType", 6], ["URL", 6], ["ResourceRoute", 6], ["coroutine", 6], ["Handle", 6], ["WeakSet", 5], ["SocketKind", 5], ["_ABC", 5], ["_ParameterKind", 5], ["SSLProtocolState", 5], ["ChunkState", 5], ["FlagBoundary", 4], ["method-wrapper", 4], ["RLock", 4], ["deque", 4], ["PlaceHolder", 4], ["AppProtocolState", 4], ["_BarrierState", 4], ["ParseState", 4], ["WSParserState", 4], ["UUID", 4], ["Signal", 4], ["socket", 4], ["TextIOWrapper", 3], ["FileIO", 3], ["_Printer", 3], ["EnumCheck", 3], ["count", 3], ["Sigmasks", 3], ["VerifyMode", 3], ["_SendfileMode", 3], ["_FIELD_BASE", 3], ["_Unquoter", 3], ["HttpVersion", 3], ["_LiteralGenericAlias", 3], ["Order", 3], ["SafeUUID", 3], ["ContentCoding", 3], ["CIMultiDict", 3], ["NicDuplex", 3], ["PlainResource", 3], ["Task", 3], ["SelectorKey", 3], ["IncrementalEncoder", 2], ["BufferedWriter", 2], ["_Environ", 2], ["Quitter", 2], ["Formatter", 2], ["WeakValueDictionary", 2], ["Handlers", 2], ["Purpose", 2], ["_ASN1Object", 2], ["_TypedDictMeta", 2], ["_DeprecatedType", 2], ["mappingproxy", 2], ["Random", 2], ["UnionType", 2], ["slice", 2], ["PosixPath", 2], ["_TypingMeta", 2], ["Charset", 2], ["_localized_day", 2], ["_localized_month", 2], ["WSMessage", 2], ["RawRequestMessage", 2], ["SimpleCookie", 2], ["BatteryTime", 2], ["Process", 2], ["Server", 2], ["FutureIter", 2], ["TimerHandle", 2], ["_WeakValueDictionary", 1], ["IncrementalDecoder", 1], ["CodecInfo", 1], ["BufferedReader", 1], ["_DeprecateByteStringMeta", 1], ["DistutilsMetaFinder", 1], ["_Helper", 1], ["Repr", 1], ["_auto_null", 1], ["_Sentinel", 1], ["_MainThread", 1], ["Event", 1], ["Condition", 1], ["PercentStyle", 1], ["_StderrHandler", 1], ["RootLogger", 1], ["Manager", 1], ["Load", 1], ["Store", 1], ["Del", 1], ["And", 1], ["Or", 1], ["Add", 1], ["Sub", 1], ["Mult", 1], ["MatMult", 1], ["Div", 1], ["Mod", 1], ["Pow", 1], ["LShift", 1], ["RShift", 1], ["BitOr", 1], ["BitXor", 1], ["BitAnd", 1], ["FloorDiv", 1], ["Invert", 1], ["Not", 1], ["UAdd", 1], ["USub", 1], ["Eq", 1], ["NotEq", 1], ["Lt", 1], ["LtE", 1], ["Gt", 1], ["GtE", 1], ["Is", 1], ["IsNot", 1], ["In", 1], ["NotIn", 1], ["_Unknown", 1], ["_RunningLoop", 1], ["_AnyMeta", 1], ["_LiteralSpecialForm", 1], ["_CallableType", 1], ["_DeprecatedGenericAlias", 1], ["_TupleType", 1], ["NamedTupleMeta", 1], ["TypeAliasType", 1], ["uname_result", 1], ["_HAS_DEFAULT_FACTORY_CLASS", 1], ["_MISSING_TYPE", 1], ["_KW_ONLY_TYPE", 1], ["JSONEncoder", 1], ["JSONDecoder", 1], ["Scanner", 1], ["UCD", 1], ["TextCalendar", 1], ["Compat32", 1], ["_QByteMap", 1], ["_SENTINEL", 1], ["EmptyStreamReader", 1], ["PayloadRegistry", 1], ["ClientWSTimeout", 1], ["Lib", 1], ["ClientTimeout", 1], ["HTTPNotFound", 1], ["_WrapNumbers", 1], ["Application", 1], ["UrlDispatcher", 1], ["FrozenList", 1], ["CleanupContext", 1], ["_UnixDefaultEventLoopPolicy", 1], ["_Local", 1], ["_UnixSelectorEventLoop", 1], ["KqueueSelector", 1], ["_SelectorMapping", 1], ["AppRunner", 1], ["Future", 1], ["TCPSite", 1], ["RequestHandler", 1], ["HttpRequestParser", 1], ["HeadersParser", 1], ["AccessLoggerWrapper", 1], ["_SelectorSocketTransport", 1], ["KeyedRef", 1], ["AccessLogger", 1], ["TransportSocket", 1], ["Response", 1], ["StreamWriter", 1], ["Request", 1], ["UrlMappingMatchInfo", 1], ["CIMultiDictProxy", 1], ["TaskStepMethWrapper", 1]]} That is what is in memory after everything disconnects.. Although RSS keeps increasing, I don't see a leak of objects actually happening |
Going to try building without extensions to see if that changes anything |
Even with no extensions I still see rss increasing each cycle, but I don't see any python objects actually leaking |
modified app that shows objects in memory import asyncio
import gc
import objgraph
import aiohttp
from aiohttp import web
import psutil
import pprint
process = psutil.Process()
async def websocket_handler(request):
mem = process.memory_info().rss
print(mem)
ws = web.WebSocketResponse(heartbeat=5, compress=False)
await ws.prepare(request)
await ws.send_str("hi")
print(process.memory_info().rss)
try:
await ws.send_str("hi")
print(process.memory_info().rss)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
break
await ws.send_str(msg)
except (Exception, asyncio.CancelledError):
pass
finally:
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
return ws
async def gc_handler(request):
gc.collect()
return web.json_response(
{
"status": "ok",
"mem": process.memory_info().rss,
"WebSocketResponse": len(objgraph.by_type("WebSocketResponse")),
"WebSocketWriter": len(objgraph.by_type("WebSocketWriter")),
"Transport": len(objgraph.by_type("Transport")),
"TimerHandle": len(objgraph.by_type("TimerHandle")),
"FlowControlDataQueue": len(objgraph.by_type("FlowControlDataQueue")),
"CIMultiDict": len(objgraph.by_type("CIMultiDict")),
"deque": len(objgraph.by_type("deque")),
"common": objgraph.most_common_types(limit=500),
}
)
async def objects_handler(request):
return web.json_response({"status": "ok", "mem": process.memory_info().rss, "objects": objgraph.growth(limit=1000)})
app = web.Application(client_max_size=1024**10)
app.add_routes(
[
web.get("/land/websocket-tunnel", websocket_handler),
web.get("/land/gc", gc_handler),
web.get("/land/objects", objects_handler),
]
)
if __name__ == "__main__":
web.run_app(app, host="0.0.0.0", port=8000) |
If we adjust the app to do async def websocket_handler(request):
mem = process.memory_info().rss
print(mem)
ws = web.WebSocketResponse(heartbeat=5, compress=False)
await ws.prepare(request)
await ws.send_str("hi")
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
return and index 475b1f78..77185dd1 100644
--- a/aiohttp/http_websocket.py
+++ b/aiohttp/http_websocket.py
@@ -729,3 +729,6 @@ class WebSocketWriter:
)
finally:
self._closing = True
+ self.transport.close()
+ self.transport = None
+ self.protocol = None It doesn't leak But as soon as we start recving it does leak |
Probably need to try this on a different OS to see if the results are similar |
After waiting 30 minutes, it still holding the memory. Need to get something more minimal to reproduce. That's going to be hard as its not clear yet how it gets triggered. Will need some more work on this when I have some more free cycles |
On a side note we probably use more memory than needed since it inherits from StreamResponse but doesn't use most of it |
Hi, is there anything we can do in our code at the moment ? |
Not yet. The source of the leak has not been identified yet |
For info I'm moving to sse for now ( aiohttp-sse ) which also extends StreamResponse, doesn't show any leak after 2days. memory remain constant |
Still haven't had any luck finding the leak |
I think I'm going to have to go line by line |
There is a race on closing but I don't thin its the leak ?
|
might be something holding the exception |
|
killing the test client in the middle of the script makes the leak worse
|
the fire and forget ping task might be getting gced prematurely |
Can you try #7978 ? |
Ok, I'll try it this week. |
Describe the bug
I have a aiohttp server, it supply a router for websoccket, when I use 10000 client to connect the server. and disconnect after 5 seconds, the server will not release the memory.
server handler
client script
before run script:
after finish run script:
after I use gc.collectI():
To Reproduce
Expected behavior
memory will be released,
Logs/tracebacks
Python Version
aiohttp Version
multidict Version
yarl Version
OS
Windows 10 21H1 19043.1348
Related component
Server
Additional context
No response
Code of Conduct
The text was updated successfully, but these errors were encountered: