django / channels_redis

Redis channel layer backend for Django Channels
BSD 3-Clause "New" or "Revised" License
601 stars 197 forks source link

Failing test caused other tests to fail with RuntimeError: Event loop is closed on pytest #367

Closed maribedran closed 10 months ago

maribedran commented 1 year ago

I have multiple tests for a websocket consumer where one failing test causes other tests that pass when run on their own to fail.

I initially commented on it here

The issue only happens on tests that use more than one communicator, so they're rather long. Sorry about that.

Django==4.0.10
channels==4.0.0
channels-redis==4.1.0
pytest==7.3.2
pytest-asyncio==0.21.0
pytest-django==4.5.2
redis==4.5.5
@pytest.mark.asyncio
async def test_failing():
    communicator_1 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
    communicator_2 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
    connected_1, _ = await communicator_1.connect()
    connected_2, _ = await communicator_2.connect()
    assert connected_1
    assert connected_2
    await communicator_1.send_json_to({"report": 1, "action": "join"})
    comm_1_msg = await communicator_1.receive_json_from()
    assert comm_1_msg == {"others": 0, "locked": False, "editing": False}
    await communicator_2.send_json_to({"report": 2, "action": "join"})
    comm_2_msg = await communicator_2.receive_json_from()
    # ERROR here:
    assert comm_2_msg == {"others": 0, "locked": False, "editing": True}
    assert await communicator_1.receive_nothing()
    assert await communicator_2.receive_nothing()
    await communicator_1.disconnect()
    await communicator_2.disconnect()

@pytest.mark.asyncio
async def test_passing():
    communicator_1 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
    communicator_2 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
    connected_1, _ = await communicator_1.connect()
    connected_2, _ = await communicator_2.connect()
    assert connected_1
    assert connected_2
    await communicator_1.send_json_to({"report": 1, "action": "join"})
    comm_1_msg = await communicator_1.receive_json_from()
    assert comm_1_msg == {"others": 0, "locked": False, "editing": False}
    await communicator_2.send_json_to({"report": 2, "action": "join"})
    comm_2_msg = await communicator_2.receive_json_from()
    assert comm_2_msg == {"others": 0, "locked": False, "editing": False}
    assert await communicator_1.receive_nothing()
    assert await communicator_2.receive_nothing()
    await communicator_1.disconnect()
    await communicator_2.disconnect()

The first test fails as expected:

>       assert comm_2_msg == {"others": 0, "locked": False, "editing": True}
E       AssertionError: assert {'editing': F..., 'others': 0} == {'editing': T..., 'others': 0}
E         Omitting 2 identical items, use -vv to show
E         Differing items:
E         {'editing': False} != {'editing': True}
E         Use -v to get more diff

backend/ws_api_v1/tests/test_edit_report.py:412: AssertionError

The second one passes when run alone and raises a RuntimeError when run together with the first one:

_____________________________________________ test_passing _____________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7ff35cf1ee90>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

/venv/lib/python3.10/site-packages/asgiref/testing.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7ff35cf1ed40 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.

        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._get_loop().create_future()
            self._getters.append(getter)
            try:
>               await getter
E               asyncio.exceptions.CancelledError

/usr/lib/python3.10/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7ff35cf1ee90>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
>           async with async_timeout(timeout):

/venv/lib/python3.10/site-packages/asgiref/testing.py:73: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x7ff35cf1ea40>
exc_type = <class 'asyncio.exceptions.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7ff35d1ca340>

    async def __aexit__(
        self,
        exc_type: Type[BaseException],
        exc_val: BaseException,
        exc_tb: TracebackType,
    ) -> None:
>       self._do_exit(exc_type)

/venv/lib/python3.10/site-packages/asgiref/timeout.py:71: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x7ff35cf1ea40>
exc_type = <class 'asyncio.exceptions.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           asyncio.exceptions.TimeoutError

/venv/lib/python3.10/site-packages/asgiref/timeout.py:108: TimeoutError

During handling of the above exception, another exception occurred:

    @pytest.mark.asyncio
    async def test_passing():
        communicator_1 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
        communicator_2 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
        connected_1, _ = await communicator_1.connect()
        connected_2, _ = await communicator_2.connect()
        assert connected_1
        assert connected_2
        await communicator_1.send_json_to({"report": 1, "action": "join"})
>       comm_1_msg = await communicator_1.receive_json_from()

backend/ws_api_v1/tests/test_edit_report.py:428: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/venv/lib/python3.10/site-packages/channels/testing/websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
/venv/lib/python3.10/site-packages/channels/testing/websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
/venv/lib/python3.10/site-packages/asgiref/testing.py:78: in receive_output
    self.future.result()
/venv/lib/python3.10/site-packages/channels/consumer.py:94: in app
    return await consumer(scope, receive, send)
/venv/lib/python3.10/site-packages/channels/consumer.py:58: in __call__
    await await_many_dispatch(
/venv/lib/python3.10/site-packages/channels/utils.py:50: in await_many_dispatch
    await dispatch(result)
/venv/lib/python3.10/site-packages/channels/consumer.py:73: in dispatch
    await handler(message)
/venv/lib/python3.10/site-packages/channels/generic/websocket.py:194: in websocket_receive
    await self.receive(text_data=message["text"])
/venv/lib/python3.10/site-packages/channels/generic/websocket.py:257: in receive
    await self.receive_json(await self.decode_json(text_data), **kwargs)
backend/ws_api_v1/edit_report/consumers.py:38: in receive_json
    await self.send_to_group(
backend/ws_api_v1/edit_report/consumers.py:77: in send_to_group
    await self.channel_layer.group_send(f"report-{self.report}", payload)
/venv/lib/python3.10/site-packages/channels/layers.py:347: in group_send
    await self.send(channel, message)
/venv/lib/python3.10/site-packages/channels/layers.py:234: in send
    await queue.put((time.time() + self.expiry, deepcopy(message)))
/usr/lib/python3.10/asyncio/queues.py:136: in put
    return self.put_nowait(item)
/usr/lib/python3.10/asyncio/queues.py:148: in put_nowait
    self._wakeup_next(self._getters)
/usr/lib/python3.10/asyncio/queues.py:64: in _wakeup_next
    waiter.set_result(None)
/usr/lib/python3.10/asyncio/base_events.py:750: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/lib/python3.10/asyncio/base_events.py:515: RuntimeError
maribedran commented 1 year ago

@ipmb figured out how to solve it.

If this is expected behavior, maybe the docs should reflect it. I don't know async Python enough to understand why the disconnect has to be moved to the fixture.

@pytest_asyncio.fixture
async def communicator_1():
    communicator = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
    connected, _ = await communicator.connect()
    assert connected
    yield communicator
    assert await communicator.receive_nothing()
    await communicator.disconnect()

@pytest_asyncio.fixture
async def communicator_2():
    communicator = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
    connected, _ = await communicator.connect()
    assert connected
    yield communicator
    assert await communicator.receive_nothing()
    await communicator.disconnect()

@pytest.mark.asyncio
async def test_failing(communicator_1, communicator_2):
    await communicator_1.send_json_to({"report": 1, "action": "join"})
    comm_1_msg = await communicator_1.receive_json_from()
    assert comm_1_msg == {"others": 0, "locked": False, "editing": False}
    await communicator_2.send_json_to({"report": 2, "action": "join"})
    comm_2_msg = await communicator_2.receive_json_from()
    # ERROR here:
    assert comm_2_msg == {"others": 0, "locked": False, "editing": True}

@pytest.mark.asyncio
async def test_passing(communicator_1, communicator_2):
    await communicator_1.send_json_to({"report": 1, "action": "join"})
    comm_1_msg = await communicator_1.receive_json_from()
    assert comm_1_msg == {"others": 0, "locked": False, "editing": False}
    await communicator_2.send_json_to({"report": 2, "action": "join"})
    comm_2_msg = await communicator_2.receive_json_from()
    assert comm_2_msg == {"others": 0, "locked": False, "editing": False}
carltongibson commented 1 year ago

OK... so... I guess the issue is about event loops (and how pytest is creating/managing those) TBH I'd have to do some research into that… but the error must be leading to a loop shutdown. 🤔

It's coming up in your search because channels_redis was hitting this issue in 4.0, due to the new redis-py requirement to shut async connections before shutting down the event loop — which happens on each call to async_to_sync(). (Channels Redis 4.1 does that automatically for you.)

If we can get a phrasing right, I'd happily take an admonition to the Channels testing docs.

maribedran commented 1 year ago

This is really outside of my skill set to even begin investigating, but if you think this is an issue with how pytest-asyncio handles event loops on test failures I'd be happy to create an issue over there.

carltongibson commented 1 year ago

@maribedran I suspect that it's expected behaviour... I'd have to go and read their docs.

carltongibson commented 10 months ago

OK, I don't think this is addressable here. Happy to reopen/take a PR if there's concrete thoughts to the contrary.