django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.02k stars 793 forks source link

TimeoutError with pytest when group_discard is used #364 #2004

Closed samupl closed 1 year ago

samupl commented 1 year ago

Summary

I was working on a legacy project that used channels==2.4.0 and channels-redis==2.4.2 - I decided it was about time to update the packages.

After updating, obviously a lot of tests failed. I managed to fix them all except this one. I tried to narrow it down to a minimal reproduction as much as possible.

If a consumer does something like this:

class ExampleConsumer(AsyncJsonWebsocketConsumer):
    ...
    async def connect(self):
        await self.channel_layer.group_add('test', self.channel_name)

    async def disconnect(self, code):
        await self.channel_layer.group_discard('test', self.channel_name)

And we have a simple pytest test that verifies this consumer:

@pytest.mark.django_db(transaction=True)
class TestChatCommunicator:

    @pytest.mark.asyncio
    async def test_reproduce_exception(self, connected_communicator: QSWebsocketCommunicator) -> None:
        await connected_communicator.send_json_to({
            'command': 'raise'
        })
        response = await connected_communicator.receive_json_from()
        assert response == {'status': 'raised'}

    @pytest.mark.asyncio
    async def test_reproduce_ok(self, connected_communicator: QSWebsocketCommunicator) -> None:
        await connected_communicator.send_json_to({
            'command': 'ok'
        })
        response = await connected_communicator.receive_json_from()
        assert response == {'status': 'ok'}

pytest with pytest-asyncio isn't able to run simple tests that verify this consumer - the tests fail with a TimeoutError.

If both connect and disconnect methods are commented out, the tests pass just fine.

Repository with reproduction: https://github.com/samupl/channels-redis-asyncio-pytest-reproduce

OS

Docker, python:3.11-slim image, Linux 866362d15f23 6.2.8-200.fc37.x86_64.

Seems to relate to any linux distro, not sure about other platforms though.

Pip freeze

asgiref==3.6.0
attrs==22.2.0
autobahn==23.1.2
Automat==22.10.0
certifi==2022.12.7
cffi==1.15.1
channels==4.0.0
channels-redis==4.1.0
constantly==15.1.0
cryptography==40.0.2
daphne==4.0.0
distlib==0.3.6
Django==4.2
django-redis==5.2.0
execnet==1.9.0
filelock==3.11.0
hyperlink==21.0.0
idna==3.4
incremental==22.10.0
iniconfig==2.0.0
msgpack==1.0.5
packaging==23.1
pipenv==2023.3.20
platformdirs==3.2.0
pluggy==1.0.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.21
pyOpenSSL==23.1.1
pytest==7.3.1
pytest-asyncio==0.21.0
pytest-django==4.5.2
pytest-sugar==0.9.7
pytest-xdist==3.2.1
python-decouple==3.8
redis==4.5.4
service-identity==21.1.0
six==1.16.0
sqlparse==0.4.3
termcolor==2.2.0
Twisted==22.10.0
txaio==23.1.1
typing_extensions==4.5.0
virtualenv==20.21.0
virtualenv-clone==0.5.7
zope.interface==6.0

Expected vs actual

Please also try and include, if you can:

How you're running Channels

Irrelevant, happens in tests

Console logs and full tracebacks of any errors

django: settings: channels_pytest_issue.settings (from ini)
rootdir: /app
configfile: pytest.ini
plugins: sugar-0.9.7, xdist-3.2.1, asyncio-0.21.0, django-4.5.2
asyncio: mode=Mode.STRICT
collected 2 items                                                                                                                                                                                                       

―――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――― ERROR at setup of TestChatCommunicator.test_reproduce_exception ―――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――

self = <example_app.tests.conftest.QSWebsocketCommunicator object at 0x7f1ced8e6dd0>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

/usr/local/lib/python3.11/site-packages/asgiref/testing.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7f1ced944cd0 maxsize=0>

    async def get(self):
        """Remove and return an item from the queue.

        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._get_loop().create_future()
            self._getters.append(getter)
            try:
>               await getter
E               asyncio.exceptions.CancelledError

/usr/local/lib/python3.11/asyncio/queues.py:158: CancelledError

During handling of the above exception, another exception occurred:

event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>, request = <SubRequest 'connected_communicator' for <Function test_reproduce_exception>>, kwargs = {}, func = <function connected_communicator at 0x7f1ced9e56c0>, setup = <function _wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.setup at 0x7f1ced753880>
finalizer = <function _wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.finalizer at 0x7f1ced7537e0>

    @functools.wraps(fixture)
    def _asyncgen_fixture_wrapper(
        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    ):
        func = _perhaps_rebind_fixture_func(
            fixture, request.instance, fixturedef.unittest
        )
        gen_obj = func(**_add_kwargs(func, kwargs, event_loop, request))

        async def setup():
            res = await gen_obj.__anext__()
            return res

        def finalizer() -> None:
            """Yield again, to finalize."""

            async def async_finalizer() -> None:
                try:
                    await gen_obj.__anext__()
                except StopAsyncIteration:
                    pass
                else:
                    msg = "Async generator fixture didn't stop."
                    msg += "Yield only once."
                    raise ValueError(msg)

            event_loop.run_until_complete(async_finalizer())

>       result = event_loop.run_until_complete(setup())

/usr/local/lib/python3.11/site-packages/pytest_asyncio/plugin.py:298: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.11/asyncio/base_events.py:653: in run_until_complete
    return future.result()
/usr/local/lib/python3.11/site-packages/pytest_asyncio/plugin.py:280: in setup
    res = await gen_obj.__anext__()
example_app/tests/conftest.py:52: in connected_communicator
    communicator = await get_connected_communicator()
example_app/tests/conftest.py:43: in get_connected_communicator
    connected, subprotocol = await communicator.connect()
/usr/local/lib/python3.11/site-packages/channels/testing/websocket.py:36: in connect
    response = await self.receive_output(timeout)
/usr/local/lib/python3.11/site-packages/asgiref/testing.py:85: in receive_output
    raise e
/usr/local/lib/python3.11/site-packages/asgiref/testing.py:73: in receive_output
    async with async_timeout(timeout):
/usr/local/lib/python3.11/site-packages/asgiref/timeout.py:71: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x7f1ced91f050>, exc_type = <class 'asyncio.exceptions.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           TimeoutError

/usr/local/lib/python3.11/site-packages/asgiref/timeout.py:108: TimeoutError
                                                                                                                                                                                                                                                                                                                                                                                50% █████     

―――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――― ERROR at setup of TestChatCommunicator.test_reproduce_ok ――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――

self = <example_app.tests.conftest.QSWebsocketCommunicator object at 0x7f1cec32bd90>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

/usr/local/lib/python3.11/site-packages/asgiref/testing.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7f1cec32bd10 maxsize=0>

    async def get(self):
        """Remove and return an item from the queue.

        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._get_loop().create_future()
            self._getters.append(getter)
            try:
>               await getter
E               asyncio.exceptions.CancelledError

/usr/local/lib/python3.11/asyncio/queues.py:158: CancelledError

During handling of the above exception, another exception occurred:

event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>, request = <SubRequest 'connected_communicator' for <Function test_reproduce_ok>>, kwargs = {}, func = <function connected_communicator at 0x7f1ced9e56c0>, setup = <function _wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.setup at 0x7f1ced7dc4a0>
finalizer = <function _wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.finalizer at 0x7f1ced969e40>

    @functools.wraps(fixture)
    def _asyncgen_fixture_wrapper(
        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    ):
        func = _perhaps_rebind_fixture_func(
            fixture, request.instance, fixturedef.unittest
        )
        gen_obj = func(**_add_kwargs(func, kwargs, event_loop, request))

        async def setup():
            res = await gen_obj.__anext__()
            return res

        def finalizer() -> None:
            """Yield again, to finalize."""

            async def async_finalizer() -> None:
                try:
                    await gen_obj.__anext__()
                except StopAsyncIteration:
                    pass
                else:
                    msg = "Async generator fixture didn't stop."
                    msg += "Yield only once."
                    raise ValueError(msg)

            event_loop.run_until_complete(async_finalizer())

>       result = event_loop.run_until_complete(setup())

/usr/local/lib/python3.11/site-packages/pytest_asyncio/plugin.py:298: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.11/asyncio/base_events.py:653: in run_until_complete
    return future.result()
/usr/local/lib/python3.11/site-packages/pytest_asyncio/plugin.py:280: in setup
    res = await gen_obj.__anext__()
example_app/tests/conftest.py:52: in connected_communicator
    communicator = await get_connected_communicator()
example_app/tests/conftest.py:43: in get_connected_communicator
    connected, subprotocol = await communicator.connect()
/usr/local/lib/python3.11/site-packages/channels/testing/websocket.py:36: in connect
    response = await self.receive_output(timeout)
/usr/local/lib/python3.11/site-packages/asgiref/testing.py:85: in receive_output
    raise e
/usr/local/lib/python3.11/site-packages/asgiref/testing.py:73: in receive_output
    async with async_timeout(timeout):
/usr/local/lib/python3.11/site-packages/asgiref/timeout.py:71: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x7f1cec328050>, exc_type = <class 'asyncio.exceptions.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           TimeoutError

/usr/local/lib/python3.11/site-packages/asgiref/timeout.py:108: TimeoutError
                                                                                                                                                                                                                                                                                                                                                                               100% ██████████
================================================================================================================================================================================== short test summary info ===================================================================================================================================================================================
FAILED example_app/tests/test_reproduce.py::TestChatCommunicator::test_reproduce_exception - TimeoutError
FAILED example_app/tests/test_reproduce.py::TestChatCommunicator::test_reproduce_ok - TimeoutError

Results (2.26s):
       2 error
carltongibson commented 1 year ago

Hmmm. Good report.

What happens if you construct the consumer inside the test method rathe than passing it in as a fixture?

samupl commented 1 year ago

@carltongibson This doesn't seem to help, tests fail with the same error.

To make testing easier, I added github actions to my reproduction repo so that I can test various workarounds through pull requests.

You can see the build failing here: https://github.com/samupl/channels-redis-asyncio-pytest-reproduce/pull/1

carltongibson commented 1 year ago

OK, thanks — that makes the test easier to read if nothing else. (I was just wondering if something about being a pytest fixture was interfering).

If both connect and disconnect methods are commented out, the tests pass just fine.

Does it block on connect and disconnect, or just the one of them? (You say group_discard in the issue title). Update: CI shows it blocking on the connect call

And just to check, it works in an application, but fails in the test yes? (If I'm reading you correctly...?)

carltongibson commented 1 year ago

Did you try calling super() (which actually sends the self.accept() message)? 🤔

samupl commented 1 year ago

I think you're right - that call to super() actually seems to make the tests pass, and the app to work correctly.

It was my mistake, because while the whole app generally seemed to work, I must have failed to test this particular consumer. I made the app on my reproduction repo work and tested it with websocat only to discover that it never went past the websocket handshake.

I added super(), which calls accept() as you said and it seems the issue is resolved: https://github.com/samupl/channels-redis-asyncio-pytest-reproduce/pull/2

Although in the "real" app I'm working on it seems that there is an await self.accept() in the connect method, so I need to debug it further and come back with a better reproduction

carltongibson commented 1 year ago

OK, I'm going to close for now. Please follow-up if you find more and it's an issue in Channels — we can happily re-open.