Closed stolenzc closed 11 months ago
add the memory used after run gc.collect()
Your scripts are not full, unable to reproduce.
this is full script
import asyncio
from websocket import create_connection
import requests
# pip install websocket
# pip install websocket-client
async def connect_ws(id):
url = f'ws://localhost:8000/land/websocket-tunnel'
ws = create_connection(url)
response = ws.recv()
print(str(id) + response)
await asyncio.sleep(5)
ws.close()
async def run():
for i in range(10000):
asyncio.ensure_future(connect_ws(i))
await asyncio.sleep(1000)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
import asyncio
from websocket import create_connection
import requests
You forgot the server. Please save my time.
I am so sorry, this is a simple full code server, you can start server by this python file, then run the script, you can watch your memory used, after scirpt finished.you can request to http://localhost:8000/land/gc
to collection the memory. this problem will reproduce.
note: you may change the max client connect use ulimit -n 11000
, on my ubuntu server, default is 1024. server will raise error after 1016 clients connect successd.
import asyncio
import gc
import aiohttp
from aiohttp import web
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
try:
await ws.send_str('hi')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
break
await ws.send_str(msg)
except (Exception, asyncio.CancelledError) as e:
pass
finally:
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
return ws
async def gc_handler(request):
gc.collect()
return web.json_response({'status': 'ok'})
app = web.Application(client_max_size=1024 ** 10)
app.add_routes([web.get('/land/websocket-tunnel', websocket_handler),
web.get('/land/gc', gc_handler)])
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=8000)
Do you reproduce this issue? and have some way to reslove or temporary solution,this is very serious in my server. hope your reply.
You forgot the server. Please save my time.
Any update on this ? I also have similar problem, my code is a bit huge. Need to tidy up before I can post it here Currently I only check using tracemalloc during runtime
app start at 100MB memory and grow to 200MB after 1 day, I need to restart every 2 weeks. It can grow more then 2GB This is 24hour biggest 5 tracemalloc
lib/python3.10/site-packages/aiohttp/http_websocket.py:614: size=166 MiB, count=4552, average=37.4 KiB
lib/python3.10/site-packages/pydantic/main.py:164: size=36.1 MiB, count=142116, average=266 B
lib/python3.10/multiprocessing/connection.py:251: size=6262 KiB, count=165790, average=39 B
lib/python3.10/site-packages/aiohttp/web_protocol.py:332: size=3545 KiB, count=47172, average=77 B
lib/python3.10/multiprocessing/reduction.py:40: size=1633 KiB, count=9952, average=168 B
App is running using docker with python:3.10.13-slim, with only 2-3 ws client connected
I can reproduce with the above code: https://github.com/aio-libs/aiohttp/issues/6325#issuecomment-972963773 https://github.com/aio-libs/aiohttp/issues/6325#issuecomment-973905192
It's necessary to set ulimit -n 11000
and then the memory usage climbs when running the client script. After calling the gc endpoint, the memory usage drops noticeably (but less than half of the total amount it increased by).
I'm struggling to make any sense of tracemalloc output, it's not clear what is happening currently. Maybe there's a better tool to use? I don't think I have time at the moment to look into it.
@bdraco I suspect this may affect homeassistant, so might be worth digging into if you have time.
I'll take a look this weekend if I can find some free cycles. I may have already fixed this problem in HA and need to upstream the fix. (Or it's a different leak)
It does looks like something is leaking python objects
[('coroutine', 67462, 17136),
('method', 42210, 10711),
('deque', 25303, 6429),
('dict', 28606, 6427),
('Context', 25309, 6427),
('list', 17727, 4285),
('ReferenceType', 18536, 4284),
('builtin_function_or_method', 17950, 4284),
('CIMultiDict', 16867, 4284),
('Task', 16867, 4284),
('Future', 16865, 4284),
('FutureIter', 16866, 4284),
('socket', 8437, 2143),
('Handle', 8439, 2143),
('RequestHandler', 8434, 2143),
('HttpRequestParser', 8434, 2143),
('HeadersParser', 8434, 2143),
('AccessLoggerWrapper', 8434, 2143),
('_SelectorSocketTransport', 8434, 2143),
('KeyedRef', 8434, 2143),
('AccessLogger', 8434, 2143),
('TransportSocket', 8434, 2143),
('WebSocketReader', 8433, 2143),
('FlowControlDataQueue', 8433, 2143),
('set', 8637, 2142),
('SplitResult', 8440, 2142),
('HttpVersion', 8435, 2142),
('URL', 8438, 2142),
('RawRequestMessage', 8434, 2142),
('SimpleCookie', 8434, 2142),
('SelectorKey', 8435, 2142),
('Request', 8433, 2142),
('UrlMappingMatchInfo', 8433, 2142),
('CIMultiDictProxy', 8433, 2142),
('StreamWriter', 8433, 2142),
('WebSocketResponse', 8432, 2142),
('WebSocketWriter', 8432, 2142),
('Timeout', 8432, 2142),
('tuple', 13318, 2138)]
These seem to be growing over time
Also it doesn't looks like the problem I was seeing in HA
If I don't create the WebSocketResponse
it doesn't leak
Creating ws = web.WebSocketResponse(heartbeat=5)
doesn't leak
async def websocket_handler(request):
mem = process.memory_info().rss
print(mem)
ws = web.WebSocketResponse(heartbeat=5)
await ws.prepare(request)
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
return ws
This doesn't appear to leak either
async def websocket_handler(request):
mem = process.memory_info().rss
print(mem)
ws = web.WebSocketResponse(heartbeat=5)
await ws.prepare(request)
await ws.send_str("hi")
print(process.memory_info().rss)
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
It leaks as soon as the send_str
is added so much smaller reproducer
leaks with compress=False as well
leak seems to be the transport. It goes away with
diff --git a/aiohttp/http_websocket.py b/aiohttp/http_websocket.py
index 475b1f78..25907c62 100644
--- a/aiohttp/http_websocket.py
+++ b/aiohttp/http_websocket.py
@@ -729,3 +729,7 @@ class WebSocketWriter:
)
finally:
self._closing = True
+ self.transport.close()
+ self.transport = None
+ self.protocol = None
+
self.transport = None
self.protocol = None
This isn't enough. without the .close()
it leaks.. but connection_lost
is being called
I can reduce the leak with
@@ -324,14 +329,17 @@ class RequestHandler(BaseProtocol):
if self._keepalive_handle is not None:
self._keepalive_handle.cancel()
+ self._keepalive_handle = None
if self._current_request is not None:
if exc is None:
exc = ConnectionResetError("Connection lost")
self._current_request._cancel(exc)
+ self._current_request = None
if self._waiter is not None:
self._waiter.cancel()
+ self._waiter = None
if handler_cancellation and self._task_handler is not None:
self._task_handler.cancel()
@@ -342,6 +350,14 @@ class RequestHandler(BaseProtocol):
self._payload_parser.feed_eof()
But I haven't been able to find what is holding the reference
leak seems to be the transport. It goes away with
We do this in client_proto, so maybe it's just an oversight that it's not being done in the websockets? https://github.com/aio-libs/aiohttp/blob/669109fee07f6cbd2bfa4ac074431d647d7d6f73/aiohttp/client_proto.py#L69-L75
Or, as we have that protocol handy already, maybe we just need to do self.protocol.close()
?
As for what's holding the reference, my suspicion is that until the transport is closed, it has either a callback method: https://github.com/python/cpython/blob/939fc6d6eab9b7ea8c244d513610dbdd556503a7/Lib/asyncio/selector_events.py#L1197-L1199 Or some writer object of ours: https://github.com/python/cpython/blob/939fc6d6eab9b7ea8c244d513610dbdd556503a7/Lib/asyncio/selector_events.py#L864
So, there's probably a circular reference of some kind there, which is also referenced in the loop, until the close() method is called.
Though, that also suggests that if the user doesn't call .close(), then we'll still have the same leak. It should get closed when the connection is lost...
I dug for a bit more on this but didn't find it.
Out of time for today but will dig more this weekend
Closing the transport there only solves whatever is leaking in the send path. As soon as I add back the receive it look like it keeps holding the reference forever and never gets GCed
We also can't close the transport there because it means anything in the buffer will not get sent before it's closed
{"status": "ok", "mem": 144556032, "WebSocketResponse": 0, "WebSocketWriter": 0, "TimerHandle": 2, "FlowControlDataQueue": 0, "CIMultiDict": 3, "deque": 4}
It looks like CIMultiDict
and deque
(probably from the underlying transport in py3.12+) get leaked, but only sometimes
async def gc_handler(request):
gc.collect()
return web.json_response(
{
"status": "ok",
"mem": process.memory_info().rss,
"WebSocketResponse": len(objgraph.by_type("WebSocketResponse")),
"WebSocketWriter": len(objgraph.by_type("WebSocketWriter")),
"TimerHandle": len(objgraph.by_type("TimerHandle")),
"FlowControlDataQueue": len(objgraph.by_type("FlowControlDataQueue")),
"CIMultiDict": len(objgraph.by_type("CIMultiDict")),
"deque": len(objgraph.by_type("deque")),
}
)
But if I wait long though they do get cleaned up.. hmm
{"status": "ok", "mem": 151601152, "WebSocketResponse": 0, "WebSocketWriter": 0, "Transport": 0, "TimerHandle": 2, "FlowControlDataQueue": 0, "CIMultiDict": 3, "deque": 4, "common": [["function", 7339], ["tuple", 4882], ["dict", 3309], ["ReferenceType", 1672], ["wrapper_descriptor", 1495], ["cell", 1144], ["builtin_function_or_method", 1086], ["method_descriptor", 1073], ["type", 1073], ["getset_descriptor", 1041], ["list", 862], ["property", 739], ["member_descriptor", 666], ["module", 316], ["ModuleSpec", 312], ["frozenset", 258], ["_tuplegetter", 250], ["SourceFileLoader", 237], ["set", 205], ["classmethod", 200], ["_GenericAlias", 189], ["staticmethod", 183], ["_abc_data", 163], ["_UnionGenericAlias", 156], ["ABCMeta", 150], ["Pattern", 93], ["_NamedIntConstant", 76], ["Field", 74], ["HTTPStatus", 62], ["classmethod_descriptor", 52], ["EnumType", 52], ["method", 49], ["ForwardRef", 45], ["IPv6Address", 43], ["reify", 38], ["_SpecialGenericAlias", 37], ["_TLSAlertType", 34], ["ExtensionFileLoader", 33], ["Signals", 31], ["Negsignal", 31], ["IPv4Address", 30], ["_DataclassParams", 29], ["_lru_cache_wrapper", 28], ["IPv6Network", 28], ["FileFinder", 27], ["AlertDescription", 27], ["_CallableGenericAlias", 24], ["cached_property", 24], ["_TLSMessageType", 22], ["TypeVar", 22], ["IPv4Network", 19], ["CType", 19], ["_Precedence", 18], ["SimpleNamespace", 17], ["BufferFlags", 17], ["Logger", 16], ["RegexFlag", 15], ["Options", 15], ["_SpecialForm", 14], ["cython_function_or_method", 14], ["builtin_method", 13], ["WSCloseCode", 13], ["scputimes", 13], ["Month", 12], ["Context", 12], ["AddressFamily", 10], ["MsgFlag", 10], ["AddressInfo", 10], ["_ProtocolMeta", 10], ["partial", 10], ["_Feature", 10], ["SSLErrorNumber", 9], ["GenericAlias", 9], ["HTTPMethod", 9], ["WSMsgType", 9], ["defaultdict", 8], ["_State", 8], ["SplitResult", 8], ["lock", 7], ["VerifyFlags", 7], ["TLSVersion", 7], ["Day", 7], ["Struct", 7], ["KeyMethod", 7], ["_SSLMethod", 6], ["_TLSContentType", 6], ["URL", 6], ["ResourceRoute", 6], ["coroutine", 6], ["Handle", 6], ["WeakSet", 5], ["SocketKind", 5], ["_ABC", 5], ["_ParameterKind", 5], ["SSLProtocolState", 5], ["ChunkState", 5], ["FlagBoundary", 4], ["method-wrapper", 4], ["RLock", 4], ["deque", 4], ["PlaceHolder", 4], ["AppProtocolState", 4], ["_BarrierState", 4], ["ParseState", 4], ["WSParserState", 4], ["UUID", 4], ["Signal", 4], ["socket", 4], ["TextIOWrapper", 3], ["FileIO", 3], ["_Printer", 3], ["EnumCheck", 3], ["count", 3], ["Sigmasks", 3], ["VerifyMode", 3], ["_SendfileMode", 3], ["_FIELD_BASE", 3], ["_Unquoter", 3], ["HttpVersion", 3], ["_LiteralGenericAlias", 3], ["Order", 3], ["SafeUUID", 3], ["ContentCoding", 3], ["CIMultiDict", 3], ["NicDuplex", 3], ["PlainResource", 3], ["Task", 3], ["SelectorKey", 3], ["IncrementalEncoder", 2], ["BufferedWriter", 2], ["_Environ", 2], ["Quitter", 2], ["Formatter", 2], ["WeakValueDictionary", 2], ["Handlers", 2], ["Purpose", 2], ["_ASN1Object", 2], ["_TypedDictMeta", 2], ["_DeprecatedType", 2], ["mappingproxy", 2], ["Random", 2], ["UnionType", 2], ["slice", 2], ["PosixPath", 2], ["_TypingMeta", 2], ["Charset", 2], ["_localized_day", 2], ["_localized_month", 2], ["WSMessage", 2], ["RawRequestMessage", 2], ["SimpleCookie", 2], ["BatteryTime", 2], ["Process", 2], ["Server", 2], ["FutureIter", 2], ["TimerHandle", 2], ["_WeakValueDictionary", 1], ["IncrementalDecoder", 1], ["CodecInfo", 1], ["BufferedReader", 1], ["_DeprecateByteStringMeta", 1], ["DistutilsMetaFinder", 1], ["_Helper", 1], ["Repr", 1], ["_auto_null", 1], ["_Sentinel", 1], ["_MainThread", 1], ["Event", 1], ["Condition", 1], ["PercentStyle", 1], ["_StderrHandler", 1], ["RootLogger", 1], ["Manager", 1], ["Load", 1], ["Store", 1], ["Del", 1], ["And", 1], ["Or", 1], ["Add", 1], ["Sub", 1], ["Mult", 1], ["MatMult", 1], ["Div", 1], ["Mod", 1], ["Pow", 1], ["LShift", 1], ["RShift", 1], ["BitOr", 1], ["BitXor", 1], ["BitAnd", 1], ["FloorDiv", 1], ["Invert", 1], ["Not", 1], ["UAdd", 1], ["USub", 1], ["Eq", 1], ["NotEq", 1], ["Lt", 1], ["LtE", 1], ["Gt", 1], ["GtE", 1], ["Is", 1], ["IsNot", 1], ["In", 1], ["NotIn", 1], ["_Unknown", 1], ["_RunningLoop", 1], ["_AnyMeta", 1], ["_LiteralSpecialForm", 1], ["_CallableType", 1], ["_DeprecatedGenericAlias", 1], ["_TupleType", 1], ["NamedTupleMeta", 1], ["TypeAliasType", 1], ["uname_result", 1], ["_HAS_DEFAULT_FACTORY_CLASS", 1], ["_MISSING_TYPE", 1], ["_KW_ONLY_TYPE", 1], ["JSONEncoder", 1], ["JSONDecoder", 1], ["Scanner", 1], ["UCD", 1], ["TextCalendar", 1], ["Compat32", 1], ["_QByteMap", 1], ["_SENTINEL", 1], ["EmptyStreamReader", 1], ["PayloadRegistry", 1], ["ClientWSTimeout", 1], ["Lib", 1], ["ClientTimeout", 1], ["HTTPNotFound", 1], ["_WrapNumbers", 1], ["Application", 1], ["UrlDispatcher", 1], ["FrozenList", 1], ["CleanupContext", 1], ["_UnixDefaultEventLoopPolicy", 1], ["_Local", 1], ["_UnixSelectorEventLoop", 1], ["KqueueSelector", 1], ["_SelectorMapping", 1], ["AppRunner", 1], ["Future", 1], ["TCPSite", 1], ["RequestHandler", 1], ["HttpRequestParser", 1], ["HeadersParser", 1], ["AccessLoggerWrapper", 1], ["_SelectorSocketTransport", 1], ["KeyedRef", 1], ["AccessLogger", 1], ["TransportSocket", 1], ["Response", 1], ["StreamWriter", 1], ["Request", 1], ["UrlMappingMatchInfo", 1], ["CIMultiDictProxy", 1], ["TaskStepMethWrapper", 1]]}
That is what is in memory after everything disconnects.. Although RSS keeps increasing, I don't see a leak of objects actually happening
Going to try building without extensions to see if that changes anything
async def gc_handler(request):
gc.collect()
return web.json_response(
{
"status": "ok",
"mem": process.memory_info().rss,
"WebSocketResponse": len(objgraph.by_type("WebSocketResponse")),
"WebSocketWriter": len(objgraph.by_type("WebSocketWriter")),
"Transport": len(objgraph.by_type("Transport")),
"TimerHandle": len(objgraph.by_type("TimerHandle")),
"FlowControlDataQueue": len(objgraph.by_type("FlowControlDataQueue")),
"CIMultiDict": len(objgraph.by_type("CIMultiDict")),
"deque": len(objgraph.by_type("deque")),
"common": objgraph.most_common_types(limit=500),
}
)
Even with no extensions I still see rss increasing each cycle, but I don't see any python objects actually leaking
modified app that shows objects in memory
import asyncio
import gc
import objgraph
import aiohttp
from aiohttp import web
import psutil
import pprint
process = psutil.Process()
async def websocket_handler(request):
mem = process.memory_info().rss
print(mem)
ws = web.WebSocketResponse(heartbeat=5, compress=False)
await ws.prepare(request)
await ws.send_str("hi")
print(process.memory_info().rss)
try:
await ws.send_str("hi")
print(process.memory_info().rss)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
break
await ws.send_str(msg)
except (Exception, asyncio.CancelledError):
pass
finally:
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
return ws
async def gc_handler(request):
gc.collect()
return web.json_response(
{
"status": "ok",
"mem": process.memory_info().rss,
"WebSocketResponse": len(objgraph.by_type("WebSocketResponse")),
"WebSocketWriter": len(objgraph.by_type("WebSocketWriter")),
"Transport": len(objgraph.by_type("Transport")),
"TimerHandle": len(objgraph.by_type("TimerHandle")),
"FlowControlDataQueue": len(objgraph.by_type("FlowControlDataQueue")),
"CIMultiDict": len(objgraph.by_type("CIMultiDict")),
"deque": len(objgraph.by_type("deque")),
"common": objgraph.most_common_types(limit=500),
}
)
async def objects_handler(request):
return web.json_response({"status": "ok", "mem": process.memory_info().rss, "objects": objgraph.growth(limit=1000)})
app = web.Application(client_max_size=1024**10)
app.add_routes(
[
web.get("/land/websocket-tunnel", websocket_handler),
web.get("/land/gc", gc_handler),
web.get("/land/objects", objects_handler),
]
)
if __name__ == "__main__":
web.run_app(app, host="0.0.0.0", port=8000)
If we adjust the app to do
async def websocket_handler(request):
mem = process.memory_info().rss
print(mem)
ws = web.WebSocketResponse(heartbeat=5, compress=False)
await ws.prepare(request)
await ws.send_str("hi")
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY)
return
and
index 475b1f78..77185dd1 100644
--- a/aiohttp/http_websocket.py
+++ b/aiohttp/http_websocket.py
@@ -729,3 +729,6 @@ class WebSocketWriter:
)
finally:
self._closing = True
+ self.transport.close()
+ self.transport = None
+ self.protocol = None
It doesn't leak
But as soon as we start recving it does leak
Probably need to try this on a different OS to see if the results are similar
After waiting 30 minutes, it still holding the memory.
Need to get something more minimal to reproduce. That's going to be hard as its not clear yet how it gets triggered. Will need some more work on this when I have some more free cycles
On a side note we probably use more memory than needed since it inherits from StreamResponse but doesn't use most of it
Hi, is there anything we can do in our code at the moment ?
Not yet. The source of the leak has not been identified yet
On a side note we probably use more memory than needed since it inherits from StreamResponse but doesn't use most of it
For info I'm moving to sse for now ( aiohttp-sse ) which also extends StreamResponse, doesn't show any leak after 2days. memory remain constant
Still haven't had any luck finding the leak
I think I'm going to have to go line by line
There is a race on closing but I don't thin its the leak ?
Task exception was never retrieved
future: <Task finished name='Task-35907' coro=<WebSocketWriter.ping() done, defined at /Users/bdraco/aiohttp/aiohttp/http_websocket.py:702> exception=AttributeError("'NoneType' object has no attribute 'is_closing'")>
Traceback (most recent call last):
File "/Users/bdraco/aiohttp/aiohttp/http_websocket.py", line 706, in ping
await self._send_frame(message, WSMsgType.PING)
File "/Users/bdraco/aiohttp/aiohttp/http_websocket.py", line 673, in _send_frame
self._write(header + message)
File "/Users/bdraco/aiohttp/aiohttp/http_websocket.py", line 692, in _write
if self.transport.is_closing():
^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'is_closing'
frame 25254
traceback 14721
dict 7367
might be something holding the exception
frame 48575
traceback 28329
dict 11255
deque 8098
killing the test client in the middle of the script makes the leak worse
ConnectionResetError 4312
FlowControlDataQueue 4312
WebSocketResponse 4312
WebSocketWriter 4312
wrapper_descriptor 1515
the fire and forget ping task might be getting gced prematurely
@stolenzc @wiryonolau
Can you try https://github.com/aio-libs/aiohttp/pull/7978 ?
Ok, I'll try it this week.
Describe the bug
I have a aiohttp server, it supply a router for websoccket, when I use 10000 client to connect the server. and disconnect after 5 seconds, the server will not release the memory.
server handler
client script
before run script:
after finish run script:
after I use gc.collectI(): ![Uploading image.png…]()
To Reproduce
Expected behavior
memory will be released,
Logs/tracebacks
Python Version
aiohttp Version
multidict Version
yarl Version
OS
Windows 10 21H1 19043.1348
Related component
Server
Additional context
No response
Code of Conduct