frankie567 / httpx-ws

WebSocket support for HTTPX
https://frankie567.github.io/httpx-ws/
MIT License
110 stars 17 forks source link

Serious memory leak in WebSocketSession #76

Closed ro-oliveira95 closed 1 month ago

ro-oliveira95 commented 1 month ago

Describe the bug

While using connect_ws I noticed that the memory of the application was slowly going up. After digging a bit I found that the number of active threads was increasing every time a new message was received, indicating a memory leak somewhere in the connection/session manager.

To Reproduce

In a separate module (server.py) spin up a simple ws server (here I'm using websockets for simplicity) that sends a message periodically:

import time

from websockets.sync.server import serve, ServerConnection

def ws_handler(conn: ServerConnection) -> None:
    while True:
        conn.send("ok")
        time.sleep(1)

with serve(ws_handler, "localhost", 8000) as sv:
    sv.serve_forever()

In another module (client.py) run the client code, printing the current active threads count after each received message:

import threading

from httpx_ws import connect_ws

with connect_ws('ws://localhost:8000') as ws:
    while True:
        ws.receive()
        print('threads count:', threading.active_count())

Output:

threads count: 5
threads count: 8
threads count: 9
threads count: 10
threads count: 11
threads count: 12
threads count: 13
threads count: 14
threads count: 15
threads count: 17
threads count: 17
threads count: 18
...

Expected behavior

The number of active threads should not grow indefinitely.

Configuration

Additional context

I took a look at the WebSocketSession implementation and I think the problem is in the _wait_until_closed method, more precisely when trying to cancel the pending futures (line 531):

https://github.com/frankie567/httpx-ws/blob/dc94596ea02fe1706c26599f19fe06c15e78c6dc/httpx_ws/_api.py#L517-L536

What is happening is that calling cancel on a pending future will try to cancel it but will fail if it is still running, and that is the exact case with the _should_close.wait task. I confirmed this behavior by printing the return of the task.cancel() which gives False, therefore stating that the cancel operation failed. After failing to cancel, the threads will hang forever and pile up until the connection is recycled (or the system memory is exhausted).

The good news is that the fix can be pretty simple -- we just have to ensure the future is indeed cancelled. For instance setting a timeout to the _should_close.wait callable made the threads count stop growing:

diff --git a/httpx_ws/_api.py b/httpx_ws/_api.py
index dc31cd2..765b1b4 100644
--- a/httpx_ws/_api.py
+++ b/httpx_ws/_api.py
@@ -519,7 +516,7 @@ class WebSocketSession:
     ) -> TaskResult:
         try:
             executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
-            wait_close_task = executor.submit(self._should_close.wait)
+            wait_close_task = executor.submit(self._should_close.wait, timeout=1)
             todo_task = executor.submit(callable, *args, **kwargs)
         except RuntimeError as e:
             raise ShouldClose() from e

Of course using a hard-coded timeout is not a good solution to this problem -- this was only a proof-of-concept. But if confirmed I would be happy to address this properly in a PR :)

frankie567 commented 1 month ago

Thank you for the detailed analysis and reproduction, @ro-oliveira95! Sounds indeed a serious one.

I would be glad to review PR with a proper fix for this. But I don't have a clear idea of how to do it right now 😅

ro-oliveira95 commented 1 month ago

Hi @frankie567, thanks for the quick answer.

But I don't have a clear idea of how to do it right now

Yeah for now me neither. When searching for "ways to cancel a running future" it seems we have to rely on threading.Events, but I'm not sure if it fits well in this context.

But ok, I'll do some experimentation and try to come up with a proper solution in the PR. Maybe I'll have something latter today, let's see.

ro-oliveira95 commented 1 month ago

Hey, just for you to know that I came up with a (not so pretty) solution to this, but it led some tests to fail and I still have to dig more to understand what's going on. My plan is to work on this next sunday.