aio-libs / aiohttp

Asynchronous HTTP client/server framework for asyncio and Python
https://docs.aiohttp.org
Other
15.12k stars 2.02k forks source link

ClientWebSocketResponse.closed is False but WebSocketWriter.transport.is_closing() is True #8171

Closed mastak closed 3 weeks ago

mastak commented 8 months ago

Describe the bug

The ClientWebSocketResponse.closed is False when the web socket connection is rapidly closed by some issue (or the server closed the connection by itself). Information about the closed connection is available in transport (WebSocketWriter.transport.is_closing()), but there is no public access to it from the ClientWebSocketResponse instance.

To Reproduce

Server:

import json
import logging

from aiohttp.web import Application, run_app, WebSocketResponse

logging.basicConfig(
    level=logging.DEBUG,
    format=f"%(asctime)s [%(levelname)s] (%(name)s) %(filename)s:%(lineno)d - %(message)s",
)

logger = logging.getLogger("ws_server")

async def ws_handler(request):
    ws = WebSocketResponse(heartbeat=30, max_msg_size=10 * 1024 * 1024)

    await ws.prepare(request)
    msg = await ws.receive()
    logger.info("Received msg %s", msg)
    for i in range(10):
        data = {"name": "Jon", "index": i, "response": True}
        await ws.send_bytes(json.dumps(data).encode())
        if i > 5:
            raise SystemExit

        msg = await ws.receive()
        logger.info("Received msg %s", msg)

    logger.info(f"websocket connection closed")
    return ws

def create_web_app() -> Application:
    app = Application()
    app.router.add_get("/", ws_handler)
    return app

if __name__ == "__main__":
    run_app(create_web_app(), host="0.0.0.0", port=9090)

Client

import asyncio
import json
import logging

import aiohttp
from aiohttp import ClientWebSocketResponse

logging.basicConfig(
    level=logging.DEBUG,
    format=f"%(asctime)s [%(levelname)s] (%(name)s) %(filename)s:%(lineno)d - %(message)s",
)

logger = logging.getLogger("ws_server")

WS_CLOSED = {
    aiohttp.WSMsgType.CLOSE,
    aiohttp.WSMsgType.CLOSING,
    aiohttp.WSMsgType.CLOSED,
}

async def some_job(args):
    await asyncio.sleep(0.1)

async def main():
    url = "http://127.0.0.1:9090/"
    session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5))
    async with session.ws_connect(url) as ws_client_resp:
        ws_client_resp: ClientWebSocketResponse
        await ws_client_resp.send_bytes(b"hello")

        while "Live stream":
            ws_message = await ws_client_resp.receive(timeout=10)
            logger.info(f"received msg {ws_message.type} - {ws_message.data}")
            if ws_message.type in WS_CLOSED:
                break

            data = json.loads(ws_message.data)
            await some_job(data)

            logger.info(
                f"Before send_bytes is closed: {ws_client_resp.closed}, "
                f"transport.is_closing: {ws_client_resp._writer.transport.is_closing()}"
            )
            if not ws_client_resp.closed:
                await ws_client_resp.send_bytes(b"resp")

    await session.close()

if __name__ == "__main__":
    asyncio.run(main())

Expected behavior

Have the ability to check the state of connection and avoid spending time preparing data for sending with closed transport.

Logs/tracebacks

2024-02-20 13:38:47,485 [DEBUG] (asyncio) selector_events.py:54 - Using selector: KqueueSelector
2024-02-20 13:38:47,489 [INFO] (ws_server) client.py:35 - received msg 2 - b'{"name": "Jon", "index": 0, "response": true}'
2024-02-20 13:38:47,590 [INFO] (ws_server) client.py:42 - Before send_bytes is closed: False, transport.is_closing: False
2024-02-20 13:38:47,591 [INFO] (ws_server) client.py:35 - received msg 2 - b'{"name": "Jon", "index": 1, "response": true}'
2024-02-20 13:38:47,692 [INFO] (ws_server) client.py:42 - Before send_bytes is closed: False, transport.is_closing: False
2024-02-20 13:38:47,693 [INFO] (ws_server) client.py:35 - received msg 2 - b'{"name": "Jon", "index": 2, "response": true}'
2024-02-20 13:38:47,794 [INFO] (ws_server) client.py:42 - Before send_bytes is closed: False, transport.is_closing: True
Traceback (most recent call last):
  File "/app/source/client.py", line 51, in <module>
    asyncio.run(main())
  File "/app/.pyenv/versions/3.9.16/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/app/.pyenv/versions/3.9.16/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/app/source/client.py", line 47, in main
    await ws_client_resp.send_bytes(b"resp")
  File "/app/source/venv/lib/python3.9/site-packages/aiohttp/client_ws.py", line 170, in send_bytes
    await self._writer.send(data, binary=True, compress=compress)
  File "/app/source/venv/lib/python3.9/site-packages/aiohttp/http_websocket.py", line 722, in send
    await self._send_frame(message, WSMsgType.BINARY, compress)
  File "/app/source/venv/lib/python3.9/site-packages/aiohttp/http_websocket.py", line 679, in _send_frame
    self._write(header + mask + message)
  File "/app/source/venv/lib/python3.9/site-packages/aiohttp/http_websocket.py", line 697, in _write
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport

Python Version

3.9.16

aiohttp Version

3.9.0

multidict Version

6.0.4

yarl Version

1.9.2

OS

Linux, macOS

Related component

Client

Additional context

No response

Code of Conduct

Dreamsorcerer commented 2 months ago

Are you still seeing this issue on the latest release? Maybe I'm doing something wrong, but I don't seem to get that error with your reproducer.