pywebio / PyWebIO

Write interactive web app in script way.
https://pywebio.readthedocs.io
MIT License
4.53k stars 384 forks source link

用户断开协程会话时无法正确清理 #545

Closed 18870 closed 1 year ago

18870 commented 1 year ago

BUG描述

使用 fastapi (ws adapter),协程会话时:(问题与fastapi无关)

  1. 在当前有 pywebio.session.coroutinebased.Task 在运行时断开浏览器连接,没有清理 Task.pending_futures,内部的 future 引用了 Task._wakeup 可能导致了 Task 不会被 gc 清理。
  2. 断开浏览器连接,会运行 pywebio.platform.adaptor.ws.WebSocketHandler.notify_connection_lost(),其中没有处理 _state.unclosed_sessions,包含当前的 CoroutineBasedSession 对象。由于默认的 reconnect_timeout 为0,session_clean_task() 也不会启动,最后导致该对象一直存活,不会被 gc 清理。
  3. notify_connection_lost() 的调用中,有调用 Session.close() -> Session._cleanup(),但是没有清理由WebSocketHandler 传入的 _on_task_command_on_session_close,导致 WebSocketHandler 无法被 gc 清理。

复现

首先我在

添加了 print 提示函数调用。这部分代码省略。

import asyncio
from pywebio.session import local, run_async, hold

class A:
    def __del__(self):
        print("A __del__")

async def index():
    a = A()
    local.a = a
    await asyncio.sleep(5)

    # 这两个的结果一样
    await hold() # type: ignore
    # run_async(run_forever())

    print("index finished")

async def run_forever():
    while True:
        await asyncio.sleep(1)
        print("run_forever")

from pywebio.platform.fastapi import start_server
start_server(index, port=80, reconnect_timeout=0)

浏览器访问,并关闭网页,五秒内和五秒后关闭效果是一样的。

INFO:     127.0.0.1:65280 - "GET / HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 65296) - "WebSocket /" [accepted]
WebSocketHandler.__init__
CoroutineBaseSession __init__
Task _start_main_task-yIw2tO8gDv init
INFO:     connection open
# 这里关闭了网页
# _start_main_task在 exception 后直接设置了 task_closed = True
Task _start_main_task-yIw2tO8gDv step exception
# 后续调用的 close 直接 return 了
Task _start_main_task-yIw2tO8gDv close
# Task 在 close 后没有销毁
# WebSocketHandler 在 notify_connection_lost 结束后没有销毁
INFO:     connection closed
# Session 未销毁,其中 local 包含的数据 a 在里面,也没有销毁。

如何修复

  1. 清理 pending_future 在 Task.step 的 try: ... except: 的最后添加
            except Exception as e:
                print(f"Task {self.coro_id} step exception")
                if not isinstance(e, SessionException):
                    self.session.on_task_exception()
                self.task_closed = True
                self.on_coro_stop(self)
    +                while self.pending_futures:
    +                    _, f = self.pending_futures.popitem()
    +                    f.cancel()

    这是从 Task.close 里复制过来的,直接将pending_futures.clear()了也可以。我不清楚 pending_future 的作用,也许前面的 except StopIteration as e: 后也需要添加?

在 debug 的过程中,发现 Session 和 Task 有互相引用,将 Task 内对 Session 的引用删除可以让 Session 自动清理,但是 Task 仍然因为上述原因保留着。

            except Exception as e:
                print(f"Task {self.coro_id} step exception")
                if not isinstance(e, SessionException):
                    self.session.on_task_exception()
                self.task_closed = True
                self.on_coro_stop(self)
+                self.on_coro_stop = None 

        future = None
        if isinstance(coro_yield, WebIOFuture):
            if coro_yield.coro:
                future = asyncio.run_coroutine_threadsafe(coro_yield.coro, asyncio.get_event_loop())
        elif coro_yield is not None:
            future = coro_yield
        if not self.session.closed() and hasattr(future, 'add_done_callback'):
            future.add_done_callback(self._wakeup)
            self.pending_futures[id(future)] = future
+        elif self.session.closed():
+            self.session = None
  1. 清理 unclosed_sessions 修改 notify_connection_lost
    def notify_connection_lost(self):
        _state.active_connections.pop(self.session_id, None)
        if not self.reconnectable:
            # when the connection lost is caused by `on_session_close()`, it's OK to close the session here though.
            # because the `session.close()` is reentrant
            self.session.close(nonblock=True)
    +            _state.unclosed_sessions.pop(self.session_id, None)
        else:
            if self.session_id in _state.unclosed_sessions:
                _state.detached_sessions[self.session_id] = time.time()
        logger.debug("WebSocket closed")
  2. 清理 _on_task_command_on_session_close 修改 _cleanup
    def _cleanup(self):
        for t in list(self.coros.values()):  # t.close() may cause self.coros changed size
            t.step(SessionClosedException, throw_exp=True)
            t.close()
        self.coros = {}  # delete session tasks
    +        self._on_session_close = None
    +        self._on_task_command = None

修改后 Task 和 Session 都能自动清理了。

INFO:     127.0.0.1:55511 - "GET / HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 55519) - "WebSocket /" [accepted]
WebSocketHandler.__init__
CoroutineBaseSession __init__
Task _start_main_task-rSAdf85syK init
INFO:     connection open
Task _start_main_task-rSAdf85syK step exception
Task _start_main_task-rSAdf85syK close
WebSocketHandler.__del__
Task _start_main_task-rSAdf85syK del
CoroutineBaseSession __del__
A __del__
INFO:     connection closed

原因

我的代码里使用 weakref.WeakValueDictionary 弱引用了一个对象,这个对象被我放进 session.local 里了。导致退出时字典值仍然有引用而不会自动清理。

环境信息

wang0618 commented 1 year ago

感谢反馈,最近会研究一下

wang0618 commented 1 year ago

已经在开发版中修复这个bug了。

更新了下 WebSocketHandler,只有在启用reconnect时才将session保存。这个是之前内存泄漏的直接原因。 另外几处循环引用问题也修复了,虽然python的gc也有机制可以自动回收循环引用的对象,不过消除循环引用还是有很大好处。