django / channels_redis

Redis channel layer backend for Django Channels
BSD 3-Clause "New" or "Revised" License
598 stars 196 forks source link

v4b1: RuntimeError: Event loop is closed #312

Closed nielsuit227 closed 2 years ago

nielsuit227 commented 2 years ago

I'm not sure whether the root cause is in the 4.0.0 release or in Django-Celery's 5.3.0 release of two days ago, but since today, we've been getting a recurring error in our CI, and after updating the local environment, also locally.

Before, all works well. With these two updates, I get the recurring error shown below.

OS: Windows 10 & Ubuntu 20.04.4 Pip Freeze

alembic==1.8.0
amqp==5.1.1
APScheduler==3.9.1
argparse-addons==0.8.0
asgiref==3.5.2
async-timeout==4.0.2
atomicwrites==1.4.0
attrs==21.4.0
autobahn==22.5.1
Automat==20.2.0
autopage==0.5.1
azure-core==1.24.1
azure-storage-blob==12.12.0
billiard==4.0.0
bitstruct==8.15.1
boto3==1.24.20
botocore==1.27.20
cachetools==5.2.0
cantools==37.1.0
catboost==1.0.6
celery==5.3.0a1
certifi==2022.6.15
cffi==1.15.0
channels==3.0.5
channels-redis==4.0.0b1
charset-normalizer==2.1.0
click==8.1.3
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
cliff==3.10.1
cloudpickle==2.1.0
cmaes==0.8.2
cmd2==2.4.1
colorama==0.4.5
colorlog==6.6.0
constantly==15.1.0
cryptography==37.0.2
cycler==0.11.0
daphne==3.0.2
defusedxml==0.7.1
Deprecated==1.2.13
diskcache==5.4.0
dj-rest-auth==2.2.4
Django==4.0.5
django-allauth==0.51.0
django-apscheduler==0.6.2
django-celery-beat==2.3.0
django-cors-headers==3.13.0
django-filter==22.1
django-redis==5.2.0
django-timezone-field==5.0
djangorestframework==3.13.1
faiss-cpu==1.7.2
fonttools==4.33.3
google-api-core==2.8.2
google-auth==2.9.0
google-cloud-core==2.3.1
google-cloud-storage==2.4.0
google-crc32c==1.3.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.3
graphviz==0.20
greenlet==1.1.2
gunicorn==20.1.0
hyperlink==21.0.0
idna==3.3
imbalanced-learn==0.8.0
incremental==21.3.0
iniconfig==1.1.1
isodate==0.6.1
jmespath==1.0.1
joblib==1.1.0
kiwisolver==1.4.3
kombu==5.3.0a1
lightgbm==3.3.2
llvmlite==0.38.1
mailslurp-client==15.11.1
Mako==1.2.0
MarkupSafe==2.1.1
matplotlib==3.5.2
msgpack==1.0.4
msrest==0.7.1
numba==0.55.2
numpy==1.22.4
oauthlib==3.2.0
optuna==2.10.0
packaging==21.3
pandas==1.4.3
patsy==0.5.2
pbr==5.9.0
Pillow==9.1.1
plotly==5.9.0
pluggy==0.13.1
prettytable==3.3.0
prompt-toolkit==3.0.30
protobuf==4.21.2
psycopg2-binary==2.9.3
py==1.11.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.21
PyJWT==2.4.0
pyOpenSSL==22.0.0
pyparsing==3.0.9
pyperclip==1.8.2
pyreadline3==3.4.1
pytest==6.2.1
pytest-django==4.5.2
python-can==4.0.0
python-crontab==2.6.0
python-dateutil==2.8.2
python3-openid==3.2.0
pytz==2022.1
pytz-deprecation-shim==0.1.0.post0
pywin32==304
PyYAML==6.0
redis==4.3.4
requests==2.28.1
requests-oauthlib==1.3.1
rsa==4.8
s3transfer==0.6.0
scikit-learn==1.0.1
scipy==1.8.1
service-identity==21.1.0
shap==0.40.0
six==1.16.0
slicer==0.0.7
SQLAlchemy==1.4.39
sqlparse==0.4.2
statsmodels==0.13.2
stevedore==3.5.0
tenacity==8.0.1
termcolor==1.1.0
textparser==0.24.0
threadpoolctl==3.1.0
toml==0.10.2
tqdm==4.64.0
Twisted==22.4.0
twisted-iocpsupport==1.0.2
txaio==22.2.1
typing_extensions==4.2.0
tzdata==2022.1
tzlocal==4.2
Unipath==1.1
urllib3==1.26.9
vine==5.0.0
wcwidth==0.2.5
windows-curses==2.3.0
wrapt==1.14.1
xgboost==1.6.1
zope.interface==5.4.0

Traceback:

[2022-06-30 13:37:57,967: ERROR/MainProcess] Event loop is closed
Traceback (most recent call last):
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 931, in read_response
    response = await self._parser.read_response(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 387, in read_response
    raw = await self._buffer.readline()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 313, in readline
    await self._read_from_socket()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 253, in _read_from_socket
    data = await self._stream.read(self.socket_read_size)
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\streams.py", line 684, in read
    await self._wait_for_data('read')
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\streams.py", line 517, in _wait_for_data
    await self._waiter
RuntimeError: Task <Task pending name='Task-4' coro=<AsyncToSync.main_wrap() running at C:\Users\Anon\Project\backend\env\lib\site-packages\asgiref\sync.py:284> cb=[_run_until_complete_cb() at c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\base_events.py:184]> got Future <Future pending> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Anon\Project\backend\app\tasks.py", line 319, in upload
    async_to_sync(channel_layer.group_send)(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\asgiref\sync.py", line 218, in __call__
    return call_result.result()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\concurrent\futures\_base.py", line 433, in result
    return self.__get_result()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\concurrent\futures\_base.py", line 389, in __get_result
    raise self._exception
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\asgiref\sync.py", line 284, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\channels_redis\core.py", line 570, in group_send
    await connection.zremrangebyscore(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\client.py", line 487, in execute_command
    return await conn.retry.call_with_retry(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\retry.py", line 59, in call_with_retry
    return await do()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\client.py", line 463, in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\client.py", line 505, in parse_response
    response = await connection.read_response()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 943, in read_response
    await self.disconnect()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\streams.py", line 353, in close
    return self._transport.close()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\selector_events.py", line 700, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\base_events.py", line 746, in call_soon
    self._check_closed()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[2022-06-30 13:38:04,626: ERROR/MainProcess] Event loop is closed
Traceback (most recent call last):
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 931, in read_response
    response = await self._parser.read_response(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 387, in read_response
    raw = await self._buffer.readline()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 313, in readline
    await self._read_from_socket()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 253, in _read_from_socket
    data = await self._stream.read(self.socket_read_size)
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\streams.py", line 684, in read
    await self._wait_for_data('read')
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\streams.py", line 517, in _wait_for_data
    await self._waiter
RuntimeError: Task <Task pending name='Task-10' coro=<AsyncToSync.main_wrap() running at C:\Users\Anon\Project\backend\env\lib\site-packages\asgiref\sync.py:284> cb=[_run_until_complete_cb() at c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\base_events.py:184]> got Future <Future pending> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Anon\Project\backend\app\tasks.py", line 319, in upload
    async_to_sync(channel_layer.group_send)(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\asgiref\sync.py", line 218, in __call__
    return call_result.result()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\concurrent\futures\_base.py", line 433, in result
    return self.__get_result()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\concurrent\futures\_base.py", line 389, in __get_result
    raise self._exception
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\asgiref\sync.py", line 284, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\channels_redis\core.py", line 570, in group_send
    await connection.zremrangebyscore(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\client.py", line 487, in execute_command
    return await conn.retry.call_with_retry(
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\retry.py", line 59, in call_with_retry
    return await do()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\client.py", line 463, in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\client.py", line 505, in parse_response
    response = await connection.read_response()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 943, in read_response
    await self.disconnect()
  File "C:\Users\Anon\Project\backend\env\lib\site-packages\redis\asyncio\connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\streams.py", line 353, in close
    return self._transport.close()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\selector_events.py", line 700, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\base_events.py", line 746, in call_soon
    self._check_closed()
  File "c:\users\anon\appdata\local\programs\python\python39\lib\asyncio\base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
carltongibson commented 2 years ago

Hi @nielsuit227 β€” thanks for the report. Are you able to reduce this a little to a reproduce?

bbrowning918 commented 2 years ago

I did test out the 4.0.0 beta on our staging env and it seemed to work in practice but our test suite blew up. I dug around and think it regresses on this issue:

https://github.com/django/channels/issues/859

The new redis-py connection bits likely can do something like: https://github.com/django/channels_redis/commit/f5e4799e11f472cc267598e9f78099a160f81550

The author of that fix might have some more insight on that as well, I am very new/inexperienced to the async and channels inner workings.

carltongibson commented 2 years ago

Hey @bbrowning918 β€” thanks for looking!

...but our test suite blew up.

Would you be able to isolate that in an example? πŸ€”

bbrowning918 commented 2 years ago
import unittest

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

class ChannelsRedisTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.channel_layer = get_channel_layer()

    def test_call_group_send_once(self):
        async_to_sync(self.channel_layer.group_send)('channel_1', {'message': 'message'})

    def test_call_group_send_twice(self):
        async_to_sync(self.channel_layer.group_send)('channel_1', {'message': 'message'})
        async_to_sync(self.channel_layer.group_send)('channel_2', {'message': 'message'})

In our usages we have group_send wrapped in async_to_sync for calling outside a Consumer. We use both Django signals or explicitly in a Django view to trigger these calls. Tests (unit and integration) can end up having more than 1 underlying wrapped group_send call occur when everything is all put together.

The above tests pass on:

asgiref = "==3.3.4"
channels-redis = "==3.3.0"

The above tests fail on:

asgiref = "==3.3.4"
channels-redis = "==4.0.0b1"

test_call_group_send_once passes but test_call_group_send_twice fails:

python manage.py test tests.test_channels_redis
System check identified no issues (0 silenced).
test_call_group_send_once (tests.test_channels_redis.ChannelsRedisTest) ... (0.007752s)
test_call_group_send_twice (tests.test_channels_redis.ChannelsRedisTest) ... E
======================================================================
ERROR: test_call_group_send_twice (tests.test_channels_redis.ChannelsRedisTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/python3.8/site-packages/redis/asyncio/connection.py", line 920, in read_response
    response = await self._parser.read_response(
  File "/python3.8/site-packages/redis/asyncio/connection.py", line 530, in read_response
    await self.read_from_socket()
  File "/python3.8/site-packages/redis/asyncio/connection.py", line 489, in read_from_socket
    buffer = await self._stream.read(self._read_size)
  File "/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
RuntimeError: Task <Task pending name='Task-3' coro=<AsyncToSync.main_wrap() running at /python3.8/site-packages/asgiref/sync.py:287> cb=[_run_until_complete_cb() at /python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/webapp/tests/test_channels_redis.py", line 16, in test_call_group_send_twice
    async_to_sync(self.channel_layer.group_send)('channel_1', {'message': 'message'})
  File "/python3.8/site-packages/asgiref/sync.py", line 222, in __call__
    return call_result.result()
  File "/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/python3.8/site-packages/asgiref/sync.py", line 287, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/python3.8/site-packages/channels_redis/core.py", line 570, in group_send
    await connection.zremrangebyscore(
  File "/python3.8/site-packages/redis/asyncio/client.py", line 472, in execute_command
    return await conn.retry.call_with_retry(
  File "/python3.8/site-packages/redis/asyncio/retry.py", line 50, in call_with_retry
    return await do()
  File "/python3.8/site-packages/redis/asyncio/client.py", line 451, in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
  File "/python3.8/site-packages/redis/asyncio/client.py", line 490, in parse_response
    response = await connection.read_response()
  File "/python3.8/site-packages/redis/asyncio/connection.py", line 932, in read_response
    await self.disconnect()
  File "/python3.8/site-packages/redis/asyncio/connection.py", line 824, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/python3.8/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/python3.8/asyncio/selector_events.py", line 692, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

----------------------------------------------------------------------
Ran 2 tests in 0.103s

FAILED (errors=1)
carltongibson commented 2 years ago

Thanks @bbrowning918 β€” that's super. πŸ…

ipmb commented 2 years ago

This is the only place we call asyncio.get_event_loop() in the new code: https://github.com/django/channels_redis/blob/52e240e7d59b12df498dcdb0b1fc165919c0f3d0/channels_redis/core.py#L315

Does it make sense to replace it with the technique used in https://github.com/django/channels_redis/commit/f5e4799e11f472cc267598e9f78099a160f81550?

From a cursory glance, I don't see any hooks in redis-py to specify an event loop.

carltongibson commented 2 years ago

In general, get_running_loop() is preferred. (Indeed get_event_loop() will be removed) -β€” we should at least switch to that (assuming the event loop is already running here).

carltongibson commented 2 years ago

OK, so experimenting with this a little:

Then...

@andrewgodwin's workaround from https://github.com/django/channels/issues/859#issuecomment-363179022 works...

Oh, yes, right, the connection caching code in channels_redis isn't going to like the way AsyncToSync works. This is a bug, but you can workaround it for now by using AsyncToSync in a class context:

So this passes:

import unittest

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

class ChannelsRedisTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.channel_layer = get_channel_layer()

    async def async_group_send(self, *args, **kwargs):
        self.channel_layer.group_send(*args, **kwargs)

    group_send = async_to_sync(async_group_send)

    def test_call_group_send_once(self):
        async_to_sync(self.channel_layer.group_send)('channel_1', {'message': 'message'})

    def test_call_group_send_twice(self):
        self.group_send('channel_1', {'message': 'message'})
        self.group_send('channel_2', {'message': 'message'})

The issue in django/channels#859 was resolved by f5e4799e11f472cc267598e9f78099a160f81550 (as channels_redis 2.0.3) so it looks like (yes) re-applying that here is the way to go.

carltongibson commented 2 years ago

@nielsuit227 This should have been resolved by #317. Can you give main a try to confirm if it works for you? Thanks,

carltongibson commented 2 years ago

4.0.0b2 is on PyPI now if you'd like to try it out.

Uninen commented 2 years ago

Thank You for working on this πŸ™

I just tested this with django==4.1.1, channels==4.0.b1, channels-redis==4.0.0b2 and asgiref==3.5.2, still getting the same erros. Not sure if my test code is wrong (when the tests broke I just disabled the test as the actual functionality was still OK. Those tests have been skipped out for a while now) but the error looks exactly the same as before:

====================================================== short test summary info =======================================================
FAILED pylib/core/tests/test_core_websockets.py::test_usermessage_consumer - AssertionError: assert {'message': '... 'user.toast'} ...
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
======================================== 1 failed, 16 passed, 1 skipped, 2 warnings in 7.27s =========================================
Task was destroyed but it is pending!
task: <Task cancelling name='Task-13' coro=<CookieMiddleware.__call__() running at /proj/venv310/lib/python3.10/site-packages/channels/sessions.py:47> wait_for=<Future finished result=None>>
Exception ignored in: <coroutine object CookieMiddleware.__call__ at 0x10edf1af0>
Traceback (most recent call last):
  File "/proj/venv310/lib/python3.10/site-packages/channels/sessions.py", line 47, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/sessions.py", line 263, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/auth.py", line 185, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/middleware.py", line 24, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/routing.py", line 116, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/consumer.py", line 94, in app
  File "/proj/venv310/lib/python3.10/site-packages/channels/consumer.py", line 58, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/utils.py", line 56, in await_many_dispatch
  File "/Users/uninen/.pyenv/versions/3.10.3/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
  File "/Users/uninen/.pyenv/versions/3.10.3/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-15' coro=<RedisChannelLayer.receive() running at /proj/venv310/lib/python3.10/site-packages/channels_redis/core.py:364> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object RedisChannelLayer.receive at 0x10edf2180>
Traceback (most recent call last):
  File "/proj/venv310/lib/python3.10/site-packages/channels_redis/core.py", line 364, in receive
  File "/proj/venv310/lib/python3.10/site-packages/channels_redis/core.py", line 419, in receive_single
  File "/proj/venv310/lib/python3.10/site-packages/channels_redis/core.py", line 257, in _brpop_with_clean
  File "/proj/venv310/lib/python3.10/site-packages/redis/asyncio/client.py", line 487, in execute_command
  File "/proj/venv310/lib/python3.10/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
  File "/proj/venv310/lib/python3.10/site-packages/redis/asyncio/client.py", line 463, in _send_command_parse_response
  File "/proj/venv310/lib/python3.10/site-packages/redis/asyncio/client.py", line 505, in parse_response
  File "/proj/venv310/lib/python3.10/site-packages/redis/asyncio/connection.py", line 943, in read_response
  File "/proj/venv310/lib/python3.10/site-packages/redis/asyncio/connection.py", line 822, in disconnect
  File "/proj/venv310/lib/python3.10/site-packages/async_timeout/__init__.py", line 34, in timeout
  File "/proj/venv310/lib/python3.10/site-packages/async_timeout/__init__.py", line 239, in _get_running_loop
RuntimeError: no running event loop

and the test code:

@pytest.mark.asyncio
@pytest.mark.django_db
async def test_usermessage_consumer(transaction=True):
    user = await sync_to_async(baker.make)("core.User", username="testuser", pk=1)
    user = await database_sync_to_async(User.objects.get)(pk=1)

    application = AuthMiddlewareStack(
        URLRouter(proj.routing.websocket_urlpatterns)
    )

    communicator = WebsocketCommunicator(application, "/ws/user-messages/")
    communicator.scope["user"] = user
    connected, subprotocol = await communicator.connect()
    assert connected

    # Test on connection welcome message
    message = await communicator.receive_json_from()
    assert message == {"message": "welcome testuser"}

    await communicator.send_json_to({"message": "hello world"})
    message = await communicator.receive_json_from()
    assert message == {"message": "hello world"}

    cl = get_channel_layer()
    if cl is not None:
        await cl.group_send(
            f"user-{user.uid}",
            {
                "message": "hello world",
                "type": "backend.message",
                "message_type": "foo",
            },
        )
        message = await communicator.receive_from()
        assert type(message) == bytes

    with pytest.raises(TimeoutError):
        await communicator.send_json_to({"mesage": "hello world"})
        message = await communicator.receive_json_from()
        await communicator.wait(timeout=1)

    await communicator.disconnect()
bbrowning918 commented 2 years ago

In the same staging env I referenced above, on django==3.2.15, channels==4.0.0b1, channels-redis==4.0.0b2, asgiref==3.5.2, and python==3.8 our test suite passes. I am looking at if we're non-breaking for django=>4.0.0 and if everything is working as intended.

@Uninen What does WebsocketCommunicator do/look like? application just appears to be a middleware stack, not django's get_asgi_application().

Uninen commented 2 years ago

WebsocketCommunicator is documented in the the testing docs. I tried simplifying the setup to not use the middleware but couldn't figure out a way that would work since almost all our consumers rely and use the request user.

The routing bit in my above example can also be written as:

    application = AuthMiddlewareStack(
        URLRouter(
            [
                re_path(r"ws/user-messages/$", core_consumers.UserMessageConsumer.as_asgi())
            ]
        )
    )
bbrowning918 commented 2 years ago

Those are very interesting helpers and I had no idea such a thing existed in the channels package, thank you.

I am thinking the original issue, my own test suite's issue, and yours are certainly all related to the event loop and tests; but are perhaps subtly different.

carltongibson commented 2 years ago

@Uninen Can I please ask you to just try adding a communicator.wait() at the bottom of the test case?

Why?

Task was destroyed but it is pending! β€” this looks like the test case is exiting before the tasks have a chance to shutdown. If right, would account for the RuntimeError: Event loop is closed and RuntimeError: no running event loop.

πŸ€”

I'm wondering if making sure we give the channels layer time to shut down fixes it. (Or shutting it down if necessary…)

Uninen commented 2 years ago

Sorry for the slow response. I finally figured it out. The tests actually work, it's just that pytest for some reason displays a ton of all kinds of errors when assertions fail. For example:

This works:

    # Test user toast
    await cl.group_send(
        f"user-{user.uid}",
        {
            "type": "user.toast",
            "message": "hello world",
        },
    )
    message = await communicator.receive_from()
    assert type(message) == str

But changing the assertion produces a lengthy error stacktrace (instead of just the normal assertion error):

    assert type(message) == bytes
===================================================================================================== short test summary info ======================================================================================================
FAILED pylib/core/tests/test_core_websockets.py::test_usermessage_consumer - assert <class 'str'> == bytes
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
================================================================================================== 1 failed, 2 warnings in 5.42s ===================================================================================================
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<OriginValidator.__call__() running at /proj/venv310/lib/python3.10/site-packages/channels/security/websocket.py:37> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object OriginValidator.__call__ at 0x10e550f90>
Traceback (most recent call last):
  File "/proj/venv310/lib/python3.10/site-packages/channels/security/websocket.py", line 37, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/sessions.py", line 47, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/sessions.py", line 263, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/auth.py", line 185, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/middleware.py", line 24, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/routing.py", line 116, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/consumer.py", line 94, in app
  File "/proj/venv310/lib/python3.10/site-packages/channels/consumer.py", line 58, in __call__
  File "/proj/venv310/lib/python3.10/site-packages/channels/utils.py", line 56, in await_many_dispatch
  File "/Users/uninen/.pyenv/versions/3.10.3/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
  File "/Users/uninen/.pyenv/versions/3.10.3/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-21' coro=<RedisChannelLayer.receive() running at /proj/venv310/lib/python3.10/site-packages/channels_redis/core.py:275>>
sys:1: RuntimeWarning: coroutine 'RedisChannelLayer.receive' was never awaited
Task was destroyed but it is pending!
task: <Task pending name='Task-13' coro=<Queue.get() running at /Users/uninen/.pyenv/versions/3.10.3/lib/python3.10/asyncio/queues.py:159> wait_for=<Future cancelled>>

This makes debugging the tests pretty laborous as sometimes the stacktrace completely swallows everything else and none of the normal assertion errors are shown. But moving extremely slow line by line I was able to fix all the tests, so the current b1/b2 stack does indeed work for me πŸ‘

However, this warning remains no matter what I put in the tests: /proj/venv310/lib/python3.10/site-packages/redis/asyncio/connection.py:677: DeprecationWarning: There is no current event loop

carltongibson commented 2 years ago

@Uninen Super, thank you!

A lot of these errors β€”Β Task was destroyed but it is pending! etc. β€” are expected in the sense that the test aborted so the task didn't get cleaned up properly. ... β€”Β That you resolved them is πŸ‘

However, this warning remains no matter what I put in the tests: /proj/venv310/lib/python3.10/site-packages/redis/asyncio/connection.py:677: DeprecationWarning: There is no current event loop

That's a lingering get_event_loop() presumably. I need to go and look. get_running_loop() is the preferred API.

I'm going to close this. πŸ’ƒ Looks like we're green. I will roll out the releases, only a few days late. 😜

Thanks all!

maribedran commented 1 year ago

I'm having a similar issue that seems related to this. Whenever one of my tests fails an assertion, other tests fail too.

Django==4.0.10
channels==4.0.0
channels-redis==4.1.0
pytest==7.3.2
pytest-asyncio==0.21.0
pytest-django==4.5.2

One test has a valid assertion error:


    @pytest.mark.asyncio
    async def test_group_messages_for_multiple_edit_actions():
        """
        If a second user wasn't on the group before and tries to edit (access the /edit route directly),
        they will be locked out
        """
        communicator_1 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
        communicator_2 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
        connected_1, _ = await communicator_1.connect()
        connected_2, _ = await communicator_2.connect()
        assert connected_1
        assert connected_2
        await communicator_1.send_json_to(
            {
                "report": 1,
                "action": "edit",
            }
        )
        comm_1_msg_1 = await communicator_1.receive_json_from()
        assert comm_1_msg_1 == {"others": 0, "locked": False, "editing": True}
        await communicator_2.send_json_to(
            {
                "report": 1,
                "action": "edit",
            }
        )
        com_2_msg_1 = await communicator_2.receive_json_from()
        assert com_2_msg_1 == {"others": 0, "locked": False, "editing": True}
        com_1_msg_2 = await communicator_1.receive_json_from()
        assert com_1_msg_2 == {"others": 1, "locked": False, "editing": True}
        com_2_msg_2 = await communicator_2.receive_json_from()
>       assert com_2_msg_2 == {"others": 1, "locked": False, "editing": False}
E       AssertionError: assert {'editing': F..., 'others': 1} == {'editing': F..., 'others': 1}
E         Omitting 2 identical items, use -vv to show
E         Differing items:
E         {'locked': True} != {'locked': False}
E         Use -v to get more diff

backend/ws_api_v1/tests/test_edit_report.py:140: AssertionError

Another test gets a RuntimeError when the communicator tries to access a message:

_________________________________ test_group_messages_for_take_control _________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7efcf3057250>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

/venv/lib/python3.10/site-packages/asgiref/testing.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7efcf30572b0 maxsize=0 tasks=1>

    async def get(self):
        """Remove and return an item from the queue.

        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._get_loop().create_future()
            self._getters.append(getter)
            try:
>               await getter
E               asyncio.exceptions.CancelledError

/usr/lib/python3.10/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7efcf3057250>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
>           async with async_timeout(timeout):

/venv/lib/python3.10/site-packages/asgiref/testing.py:73: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x7efcf3056500>
exc_type = <class 'asyncio.exceptions.CancelledError'>, exc_val = CancelledError()
exc_tb = <traceback object at 0x7efcf301aa80>

    async def __aexit__(
        self,
        exc_type: Type[BaseException],
        exc_val: BaseException,
        exc_tb: TracebackType,
    ) -> None:
>       self._do_exit(exc_type)

/venv/lib/python3.10/site-packages/asgiref/timeout.py:71: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.timeout.timeout object at 0x7efcf3056500>
exc_type = <class 'asyncio.exceptions.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           asyncio.exceptions.TimeoutError

/venv/lib/python3.10/site-packages/asgiref/timeout.py:108: TimeoutError

During handling of the above exception, another exception occurred:

    @pytest.mark.asyncio
    async def test_group_messages_for_take_control():
        """
        If a second user was already on the group and wants to edit
        they will be locked out
        """
        communicator_1 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
        communicator_2 = WebsocketCommunicator(EditReportConsumer.as_asgi(), "/ws/reports/")
        connected_1, _ = await communicator_1.connect()
        connected_2, _ = await communicator_2.connect()
        assert connected_1
        assert connected_2
        await communicator_1.send_json_to(
            {
                "report": 1,
                "action": "join",
            }
        )
>       comm_1_msg_1 = await communicator_1.receive_json_from()

backend/ws_api_v1/tests/test_edit_report.py:165: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/venv/lib/python3.10/site-packages/channels/testing/websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
/venv/lib/python3.10/site-packages/channels/testing/websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
/venv/lib/python3.10/site-packages/asgiref/testing.py:78: in receive_output
    self.future.result()
/venv/lib/python3.10/site-packages/channels/consumer.py:94: in app
    return await consumer(scope, receive, send)
/venv/lib/python3.10/site-packages/channels/consumer.py:58: in __call__
    await await_many_dispatch(
/venv/lib/python3.10/site-packages/channels/utils.py:50: in await_many_dispatch
    await dispatch(result)
/venv/lib/python3.10/site-packages/channels/consumer.py:73: in dispatch
    await handler(message)
/venv/lib/python3.10/site-packages/channels/generic/websocket.py:194: in websocket_receive
    await self.receive(text_data=message["text"])
/venv/lib/python3.10/site-packages/channels/generic/websocket.py:257: in receive
    await self.receive_json(await self.decode_json(text_data), **kwargs)
backend/ws_api_v1/edit_report/consumers.py:38: in receive_json
    await self.send_to_group(
backend/ws_api_v1/edit_report/consumers.py:77: in send_to_group
    await self.channel_layer.group_send(f"report-{self.report}", payload)
/venv/lib/python3.10/site-packages/channels/layers.py:347: in group_send
    await self.send(channel, message)
/venv/lib/python3.10/site-packages/channels/layers.py:234: in send
    await queue.put((time.time() + self.expiry, deepcopy(message)))
/usr/lib/python3.10/asyncio/queues.py:136: in put
    return self.put_nowait(item)
/usr/lib/python3.10/asyncio/queues.py:148: in put_nowait
    self._wakeup_next(self._getters)
/usr/lib/python3.10/asyncio/queues.py:64: in _wakeup_next
    waiter.set_result(None)
/usr/lib/python3.10/asyncio/base_events.py:750: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/lib/python3.10/asyncio/base_events.py:515: RuntimeError
carltongibson commented 1 year ago

Hi @maribedran - you might need to open a new issue. (If you can reduce it some that would be helpful.)

Worth experimenting with different versions of redis-py, they've fixed various issues over time.