encode / starlette

The little ASGI framework that shines. 🌟
https://www.starlette.io/
BSD 3-Clause "New" or "Revised" License
10.31k stars 949 forks source link

[Websockets] RuntimeError: Cannot call "receive" once a disconnect message has been received. #2617

Closed laxuscullen closed 3 months ago

laxuscullen commented 5 months ago

As recommended by Alex and confirmed that this seems to be an issue with starlette and not broadcaster, I am creating this issue.

This RuntimeError shows on client disconnect.

I have made a sample code which can reproduce the error at https://github.com/laxuscullen/WS/tree/wo_broadcaster

  File "venv/lib/python3.12/site-packages/starlette/endpoints.py", line 79, in dispatch
    message = await websocket.receive()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib/python3.12/site-packages/starlette/websockets.py", line 60, in receive
    raise RuntimeError(
RuntimeError: Cannot call "receive" once a disconnect message has been received.

[!IMPORTANT]

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.

Fund with Polar

laxuscullen commented 5 months ago

@alex-oleshkevich is there any update on this?

Kludex commented 5 months ago

What's the expected behavior here? raise WebSocketDisconect instead?

laxuscullen commented 5 months ago

What's the expected behavior here? raise WebSocketDisconect instead?

from what i understand, it is not disconnecting the websocket but the websocket disconnect event is fired. wont raising cause a runtime error?

alex-oleshkevich commented 5 months ago

I haven't had a chance to look at it yet. As a workaround, use function-style endpoints.

alex-oleshkevich commented 4 months ago

Steps to reproduce

# examples/test.py
import asyncio
import anyio

from starlette.applications import Starlette
from starlette.endpoints import WebSocketEndpoint
from starlette.routing import WebSocketRoute, Route
from starlette.websockets import WebSocket
from starlette.responses import HTMLResponse

class RoomChatWebsocket(WebSocketEndpoint):
    encoding = "json"

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
        await websocket.send_json({})

        async def sender():
            while True:
                await asyncio.sleep(1)
            print("sender exit")

        async def on_message_received(websocket: WebSocket):
            async for data in websocket.iter_json():
                print("------------ got message", data)

        async with anyio.create_task_group() as task_group:

            async def receiver():
                await on_message_received(websocket)
                task_group.cancel_scope.cancel()
                print("cancelled")

            task_group.start_soon(receiver)
            task_group.start_soon(sender)

    async def on_disconnect(self, websocket: WebSocket, close_code: int):
        print(f"Disconnected: {websocket}")

def index_route(request):
    return HTMLResponse('<script>ws = new WebSocket("http://localhost:8000/ws")</script>')

routes = [
    Route("/", index_route),
    WebSocketRoute("/ws", RoomChatWebsocket),
]

app = Starlette(
    debug=True,
    routes=routes,
)
  1. run server: uvicorn example.test:app
  2. open the page http://localhost:8000
  3. refresh the page
  4. browser closes connection, causing the following stack trace
    ERROR:    Exception in ASGI application
    Traceback (most recent call last):
    File "/home/alex/projects/starlette/venv/lib/python3.12/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 244, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File "/home/alex/projects/starlette/venv/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 70, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File "/home/alex/projects/starlette/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
    File "/home/alex/projects/starlette/starlette/middleware/errors.py", line 151, in __call__
    await self.app(scope, receive, send)
    File "/home/alex/projects/starlette/starlette/middleware/exceptions.py", line 65, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
    File "/home/alex/projects/starlette/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
    File "/home/alex/projects/starlette/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
    File "/home/alex/projects/starlette/starlette/routing.py", line 754, in __call__
    await self.middleware_stack(scope, receive, send)
    File "/home/alex/projects/starlette/starlette/routing.py", line 774, in app
    await route.handle(scope, receive, send)
    File "/home/alex/projects/starlette/starlette/routing.py", line 371, in handle
    await self.app(scope, receive, send)
    File "/home/alex/projects/starlette/starlette/endpoints.py", line 93, in dispatch
    raise exc
    File "/home/alex/projects/starlette/starlette/endpoints.py", line 80, in dispatch
    message = await websocket.receive()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
    File "/home/alex/projects/starlette/starlette/websockets.py", line 61, in receive
    raise RuntimeError(
    RuntimeError: Cannot call "receive" once a disconnect message has been received.

The problem disappears when I remove task_group.cancel_scope.cancel(). I think this is somehow connected to task cancellation.

I also observe a weird behavior. On websocket.disconnect WebSocket.receive does return a message, but this message is not available in WebSocketEndpoint.dispatch. However, then the another loop iteration immideately happens calling receive and fails because WS state is disconnected.

laxuscullen commented 3 months ago

@alex-oleshkevich whats even weirder is no amount of if checks for websocket state before the loop iteration helps.

Kludex commented 3 months ago

This is not an issue.

Use the on_receive method, instead of calling websocket.iter_json or any other method that calls websocket.receive().

laxuscullen commented 3 months ago

This is not an issue.

Use the on_receive method, instead of calling websocket.iter_json or any other method that calls websocket.receive().

Hi, kindly check the guide written on https://github.com/encode/broadcaster It tells to use iter_text. If i do not use iter_json the websocket refuses to listen to coming messages after receiving it once. Kindly test the code without iter_json.

Could you please edit the code in the guide mentioned on broadcaster page and tell me what should the code look like?

laxuscullen commented 3 months ago

@Kludex ?

Kludex commented 3 months ago

No. I cannot. I don't maintain that project.

laxuscullen commented 3 months ago

No. I cannot. I don't maintain that project.

@Kludex you closed this issue after stating a vague reply that only alex could understand with no further help?

@alex-oleshkevich could you please tell what changes to make in code since your guide for broadcaster contains code with issues?

alex-oleshkevich commented 3 months ago

@laxuscullen , Kludex refers to this method WebSocketEndpoint.on_receive See https://www.starlette.io/endpoints/#websocketendpoint

The broadcaster's documentation is correct - it demos usage with function-style code. You use class-based endpoints. So in your case the solution can look like that (i didn't test it myself, but I hope you understand the idea).


class RoomChatWebsocket(WebSocketEndpoint):
    encoding = "json"

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()

    async def on_receive(self, websocket: WebSocket, data):
        # do something with received data

    async def on_disconnect(self, websocket: WebSocket, close_code: int):
         print('disconnected')

or use function-style code as broadcaster suggests.

laxuscullen commented 3 months ago

@alex-oleshkevich I am not sure how do I use the subscriber methods or the sender method. I am not able to figure it out. Your help is appreciated for this class based approach. I prefer class based since I have method decorators code written based on class.

I refactored my code and I am able to use the function based approach.

Also in the function based approach code, there is no on_disconnect?

alex-oleshkevich commented 3 months ago

@laxuscullen here it is

import asyncio
import os

from starlette.applications import Starlette
from starlette.endpoints import WebSocketEndpoint
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates
from starlette.websockets import WebSocket

from broadcaster import Broadcast

BROADCAST_URL = os.environ.get("BROADCAST_URL", "memory://")

broadcast = Broadcast(BROADCAST_URL)
templates = Jinja2Templates("example/templates")

async def homepage(request):
    template = "index.html"
    context = {"request": request}
    return templates.TemplateResponse(template, context)

class ChatRoomWebsocket(WebSocketEndpoint):
    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
        self._listener_task = asyncio.create_task(self.chatroom_ws_sender(websocket))

    async def on_receive(self, websocket: WebSocket, data):
        await broadcast.publish(channel="chatroom", message=data)

    async def chatroom_ws_sender(self, websocket) -> None:
        async with broadcast.subscribe(channel="chatroom") as subscriber:
            async for event in subscriber:
                await websocket.send_text(event.message)

    async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
        if self._listener_task:
            self._listener_task.cancel()

routes = [
    Route("/", homepage),
    WebSocketRoute("/", ChatRoomWebsocket, name="chatroom_ws"),
]

app = Starlette(
    routes=routes,
    on_startup=[broadcast.connect],
    on_shutdown=[broadcast.disconnect],
)
laxuscullen commented 3 months ago

@alex-oleshkevich first of all, i am very thankful to your for your time and help. Really. The code works. Tested using gunicorn (tested with 10 w and 10 t) and uvicorn.