django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.08k stars 801 forks source link

Call to `get_user` causes TimeoutError #1343

Open sbdchd opened 5 years ago

sbdchd commented 5 years ago

Problem

When calling get_user() inside a consumer, sometimes the tests fail with a TimeoutError. The problem goes away when I remove this call and use self.user = self.scope["user"] instead.

However in the intended use case the user session length is relatively short so get_user() would be helpful.

Reproduction

I made a fresh django project with channels installed available at: https://github.com/sbdchd/django-channels-bug-repo

setup:

poetry install

poetry run pytest mysite

The tests should fail although I have noticed it sometimes takes a second run of the tests for the TimeoutError to occur.

The error looks as follows:

poetry run pytest mysite ``` ❯ poetry run pytest mysite 3.24s ========================================= test session starts ========================================= platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0 Django settings: mysite.settings (from ini file) rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini plugins: django-3.5.1, asyncio-0.10.0 collected 1 item mysite/chat/test_consumers.py F [100%] ============================================== FAILURES =============================================== ____________________________________________ test_consumer ____________________________________________ self = , timeout = 1 async def receive_output(self, timeout=1): """ Receives a single message from the application, with optional timeout. """ # Make sure there's not an exception to raise from the task if self.future.done(): self.future.result() # Wait and receive the message try: async with async_timeout(timeout): > return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/testing.py:75: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available. """ while self.empty(): getter = self._loop.create_future() self._getters.append(getter) try: > await getter E concurrent.futures._base.CancelledError /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError During handling of the above exception, another exception occurred: @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_consumer(): # 1. setup connections communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a")) connected, _ = await communicator.connect() assert connected comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b")) connected, _ = await comm_a.connect() assert connected comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c")) connected, _ = await comm_b.connect() assert connected # 3. subscribe to events subscribe = {"foo": "bar"} await communicator.send_json_to(subscribe) await comm_a.send_json_to(subscribe) await comm_b.send_json_to(subscribe) # 3. publish events channel_layer = get_channel_layer() await channel_layer.group_send( GROUP_NAME, { 'type': 'broadcast_message', } ) # 4. assert on received events expected_broadcast_msg = {"broadcast": "important update"} res = await communicator.receive_json_from() assert res == expected_broadcast_msg > res = await comm_a.receive_json_from() mysite/chat/test_consumers.py:59: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from payload = await self.receive_from(timeout) .venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from response = await self.receive_output(timeout) .venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output raise e .venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__ self._do_exit(exc_type) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exc_type = def _do_exit(self, exc_type: Type[BaseException]) -> None: if exc_type is asyncio.CancelledError and self._cancelled: self._cancel_handler = None self._task = None > raise asyncio.TimeoutError E concurrent.futures._base.TimeoutError .venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError ---------------------------------------- Captured stdout call ----------------------------------------- add me to the group plz a add me to the group plz c add me to the group plz b ========================================== 1 failed in 1.60s ========================================== Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Exception ignored in: .new_application at 0x110b137c8> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Exception ignored in: .new_application at 0x110d62848> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> ```

Specs

OS: MacOS Python: Python 3.7.2

Deps

poetry show ``` ❯ poetry show aioredis 1.2.0 asyncio (PEP 3156) Redis support appdirs 1.4.3 A small Python module for determining appropriate platform-specific dirs, e.g. a "user data dir". appnope 0.1.0 Disable App Nap on OS X 10.9 argh 0.26.2 An unobtrusive argparse wrapper with natural syntax asgiref 3.2.1 ASGI specs, helper code, and adapters asn1crypto 0.24.0 Fast ASN.1 parser and serializer with definitions for private keys, public keys, certificates, CRL, OCSP, CMS, PKCS#3, PKCS#7, PKCS#8, PKCS#12, PKCS#5, X.509 and TSP astroid 2.2.5 An abstract syntax tree for Python with inference support. async-timeout 3.0.1 Timeout context manager for asyncio programs atomicwrites 1.3.0 Atomic file writes. attrs 19.1.0 Classes Without Boilerplate autobahn 19.8.1 WebSocket client & server library, WAMP real-time framework automat 0.7.0 Self-service finite-state machines for the programmer on the go. backcall 0.1.0 Specifications for callback functions passed in to an API black 18.9b0 The uncompromising code formatter. certifi 2019.3.9 Python package for providing Mozilla's CA Bundle. cffi 1.12.3 Foreign Function Interface for Python calling C code. channels 2.2.0 Brings async, event-driven capabilities to Django. Django 1.11 and up only. channels-redis 2.4.0 Redis-backed ASGI channel layer implementation chardet 3.0.4 Universal encoding detector for Python 2 and 3 click 7.0 Composable command line interface toolkit colorama 0.4.1 Cross-platform colored terminal text. constantly 15.1.0 Symbolic constants in Python coreapi 2.3.3 Python client library for Core API. coreschema 0.0.4 Core Schema. coverage 4.5.2 Code coverage measurement for Python cryptography 2.7 cryptography is a package which provides cryptographic recipes and primitives to Python developers. daphne 2.3.0 Django ASGI (HTTP/WebSocket) server dataclasses 0.6 A backport of the dataclasses module for Python 3.6 decorator 4.3.2 Better living through Python with decorators defusedxml 0.5.0 XML bomb protection for Python stdlib modules dj-database-url 0.4.2 Use Database URLs in your Django Application. django 2.2 A high-level Python Web framework that encourages rapid development and clean, pragmatic design. django-allauth 0.39.1 Integrated set of Django applications addressing authentication, registration, account management as well as 3rd party (social) account authentication. django-softdelete 0.9.0 Soft delete support for Django ORM, with undelete. django-user-sessions 1.6.0 Django sessions with a foreign key to the user djangorestframework 3.9.2 Web APIs for Django, made easy. docopt 0.6.2 Pythonic argument parser, that will make you smile drf-nested-routers 0.90.0 Nested resources for the Django Rest Framework drf-yasg 1.16.0 Automated generation of real Swagger/OpenAPI 2.0 schemas from Django Rest Framework code. entrypoints 0.3 Discover and load entry points from installed packages. flake8 3.7.7 the modular source code checker: pep8, pyflakes and co flake8-comprehensions 2.1.0 A flake8 plugin to help you write better list/set/dict comprehensions. flake8-formatter-junit-xml 0.0.5 JUnit XML Formatter for flake8 flake8-pie 0.4.0 A flake8 extension that implements misc. lints flake8-print 3.1.0 print statement checker plugin for flake8 gunicorn 19.7.1 WSGI HTTP Server for UNIX h11 0.8.1 A pure-Python, bring-your-own-I/O implementation of HTTP/1.1 hiredis 1.0.0 Python wrapper for hiredis honcho 1.0.1 Honcho: a Python clone of Foreman. For managing Procfile-based applications. httptools 0.0.13 A collection of framework independent HTTP protocol utils. hyperlink 19.0.0 A featureful, immutable, and correct URL for Python. icdiff 1.9.1 improved colored diff idna 2.8 Internationalized Domain Names in Applications (IDNA) incremental 17.5.0 inflection 0.3.1 A port of Ruby on Rails inflector to Python ipdb 0.11 IPython-enabled pdb ipython 7.3.0 IPython: Productive Interactive Computing ipython-genutils 0.2.0 Vestigial utilities from IPython isort 4.3.15 A Python utility / library to sort Python imports. itypes 1.1.0 Simple immutable types for python. jedi 0.13.3 An autocompletion tool for Python that can be used for text editors. jinja2 2.10.1 A small but fast and easy to use stand-alone template engine written in pure python. junit-xml 1.8 Creates JUnit XML test result documents that can be read by tools such as Jenkins lazy-object-proxy 1.3.1 A fast and thorough lazy object proxy. markupsafe 1.1.1 Safely add untrusted strings to HTML/XML markup. mccabe 0.6.1 McCabe checker, plugin for flake8 more-itertools 7.0.0 More routines for operating on iterables, beyond itertools msgpack 0.6.1 MessagePack (de)serializer. mypy 0.720 Optional static typing for Python mypy-extensions 0.4.1 Experimental type system extensions for programs checked with the mypy typechecker. oauthlib 3.0.1 A generic, spec-compliant, thorough implementation of the OAuth request-signing logic packaging 19.0 Core utilities for Python packages parso 0.3.4 A Python Parser pathtools 0.1.2 File system general utilities pexpect 4.6.0 Pexpect allows easy control of interactive console applications. pickleshare 0.7.5 Tiny 'shelve'-like database with concurrency support pint 0.8.1 Physical quantities module pluggy 0.9.0 plugin and hook calling mechanisms for python pprintpp 0.4.0 A drop-in replacement for pprint that's actually pretty prompt-toolkit 2.0.9 Library for building powerful interactive command lines in Python psycopg2 2.7.3.2 psycopg2 - Python-PostgreSQL Database Adapter ptyprocess 0.6.0 Run a subprocess in a pseudo terminal py 1.8.0 library with cross-python path, ini-parsing, io, code, log facilities pycodestyle 2.5.0 Python style guide checker pycparser 2.19 C parser in Python pydantic 0.32.2 Data validation and settings management using python 3.6 type hinting pyflakes 2.1.1 passive checker of Python programs pygments 2.3.1 Pygments is a syntax highlighting package written in Python. pyhamcrest 1.9.0 Hamcrest framework for matcher objects pylint 2.3.1 python code static checker pyparsing 2.3.1 Python parsing module pytest 3.10.1 pytest: simple powerful testing with Python pytest-asyncio 0.10.0 Pytest support for asyncio. pytest-cov 2.6.1 Pytest plugin for measuring coverage. pytest-django 3.4.8 A Django plugin for pytest. pytest-icdiff 0.2 use icdiff for better error messages in pytest assertions pytest-sugar 0.9.2 pytest-sugar is a plugin for pytest that changes the default look and feel of pytest (e.g. progressbar, show tests that fail instantly). pytest-testmon 0.9.16 take TDD to a new level with py.test and testmon pytest-watch 4.2.0 Local continuous test runner with pytest and watchdog. python-dotenv 0.10.1 Add .env support to your django/flask apps in development and deployments python-json-logger 0.1.8 A python library adding a json log formatter python3-openid 3.1.0 OpenID support for modern servers and consumers. pytz 2018.9 World timezone definitions, modern and historical pywatchman 1.4.1 Watchman client for python pyyaml 5.1 YAML parser and emitter for Python requests 2.21.0 Python HTTP for Humans. requests-oauthlib 1.2.0 OAuthlib authentication support for Requests. rope 0.14.0 a python refactoring library... ruamel.yaml 0.15.97 ruamel.yaml is a YAML parser/emitter that supports roundtrip preservation of comments, seq/map flow style, and map key order sentry-sdk 0.7.6 Python client for Sentry (https://getsentry.com) six 1.12.0 Python 2 and 3 compatibility utilities sqlparse 0.2.4 Non-validating SQL parser termcolor 1.1.0 ANSII Color formatting for output in terminal. toml 0.10.0 Python Library for Tom's Obvious, Minimal Language traitlets 4.3.2 Traitlets Python config system twisted 19.7.0 An asynchronous networking framework written in Python txaio 18.8.1 Compatibility API between asyncio/Twisted/Trollius typed-ast 1.4.0 a fork of Python 2 and 3 ast modules with type comment support typing 3.7.4 Type Hints for Python typing-extensions 3.7.4 Backported and Experimental Type Hints for Python 3.5+ uritemplate 3.0.0 URI templates urllib3 1.24.1 HTTP library with thread-safe connection pooling, file post, and more. uvicorn 0.8.6 The lightning-fast ASGI server. uvloop 0.12.2 Fast implementation of asyncio event loop on top of libuv watchdog 0.9.0 Filesystem events monitoring wcwidth 0.1.7 Measures number of Terminal column cells of wide-character codes websockets 7.0 An implementation of the WebSocket Protocol (RFC 6455 & 7692) wrapt 1.11.1 Module for decorators, wrappers and monkey patching. zope.interface 4.6.0 Interfaces for Python ```
carltongibson commented 5 years ago

Hi @sbdchd. Good report. Thank you.

Don't suppose you can experiment in the debugger can you? What happens if you increase timeout? Where inside get_user() are we loosing the time?

sbdchd commented 5 years ago

I dug into this a bit more.

Increasing timeouts to exorbitant amounts e.g., 500 seconds didn't make a difference.

However, when replacing get_user() with a barebones version shown below, the issue still occurred, although it became more intermittent.

By adding the following diff and just running poetry run pytest mysite a couple times, the error should occur.

Seems like the TimeoutErrors are related to do @database_sync_to_async

EDIT: using @sync_to_async instead of @database_sync_to_async also causes the TimeoutError

diff --git a/mysite/chat/consumers.py b/mysite/chat/consumers.py
index 42fd816..a24ba1f 100644
--- a/mysite/chat/consumers.py
+++ b/mysite/chat/consumers.py
@@ -1,8 +1,12 @@
 from channels.generic.websocket import AsyncJsonWebsocketConsumer
-from channels.auth import get_user
+from channels.db import database_sync_to_async

 GROUP_NAME = "FOO_GROUP"

+@database_sync_to_async
+def get_user(scope):
+    return None
+
 class ChatConsumer(AsyncJsonWebsocketConsumer):
     async def connect(self):
         await self.accept()
poetry run pytest mysite ``` ========================================= test session starts ========================================= platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0 Django settings: mysite.settings (from ini file) rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini plugins: django-3.5.1, asyncio-0.10.0 collected 1 item mysite/chat/test_consumers.py F [100%] ============================================== FAILURES =============================================== ____________________________________________ test_consumer ____________________________________________ self = , timeout = 1 async def receive_output(self, timeout=1): """ Receives a single message from the application, with optional timeout. """ # Make sure there's not an exception to raise from the task if self.future.done(): self.future.result() # Wait and receive the message try: async with async_timeout(timeout): > return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/testing.py:75: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available. """ while self.empty(): getter = self._loop.create_future() self._getters.append(getter) try: > await getter E concurrent.futures._base.CancelledError /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError During handling of the above exception, another exception occurred: @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_consumer(): # 1. setup connections communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a")) connected, _ = await communicator.connect() assert connected comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b")) connected, _ = await comm_a.connect() assert connected comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c")) connected, _ = await comm_b.connect() assert connected # 3. subscribe to events subscribe = {"foo": "bar"} await communicator.send_json_to(subscribe) await comm_a.send_json_to(subscribe) await comm_b.send_json_to(subscribe) # 3. publish events channel_layer = get_channel_layer() await channel_layer.group_send( GROUP_NAME, { 'type': 'broadcast_message', } ) # 4. assert on received events expected_broadcast_msg = {"broadcast": "important update"} > res = await communicator.receive_json_from() mysite/chat/test_consumers.py:56: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from payload = await self.receive_from(timeout) .venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from response = await self.receive_output(timeout) .venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output raise e .venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__ self._do_exit(exc_type) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exc_type = def _do_exit(self, exc_type: Type[BaseException]) -> None: if exc_type is asyncio.CancelledError and self._cancelled: self._cancel_handler = None self._task = None > raise asyncio.TimeoutError E concurrent.futures._base.TimeoutError .venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError ========================================== 1 failed in 1.63s ========================================== Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Exception ignored in: .new_application at 0x106e43848> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Exception ignored in: .new_application at 0x106ea60c8> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> ```
carltongibson commented 5 years ago

Increasing timeouts to exorbitant amounts e.g., 500 seconds didn't make a difference.

Grrr. Looks like a deadlock. We're being scheduled in a TreadPoolExecutor that is never getting to run. TBH not sure why that is.

(How's pytest.asyncio setting things up? — is my first thought.)

sbdchd commented 5 years ago

I added timeouts of 10 seconds and ran the tests under cProfile and it seems the tests are stuck at select.kqueue


         1433004 function calls (1400889 primitive calls) in 11.716 seconds

   Ordered by: internal time
   List reduced from 8069 to 50 due to restriction <50>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      125   10.022    0.080   10.022    0.080 {method 'control' of 'select.kqueue' objects}
      903    0.123    0.000    0.123    0.000 {built-in method marshal.loads}
    12049    0.092    0.000    0.092    0.000 {built-in method posix.lstat}
     5989    0.076    0.000    0.076    0.000 {built-in method posix.stat}
3273/3166    0.058    0.000    0.197    0.000 {built-in method builtins.__build_class__}
      903    0.034    0.000    0.044    0.000 <frozen importlib._bootstrap_external>:914(get_data)
    39/38    0.025    0.001    0.027    0.001 {built-in method _imp.create_dynamic}
 1459/335    0.025    0.000    0.075    0.000 sre_parse.py:475(_parse)
    12423    0.023    0.000    0.037    0.000 posixpath.py:75(join)
      303    0.023    0.000    0.023    0.000 {function SQLiteCursorWrapper.execute at 0x1107bc510}
62666/62620    0.022    0.000    0.023    0.000 {built-in method builtins.getattr}
       10    0.022    0.002    0.022    0.002 {built-in method fcntl.fcntl}
   110609    0.022    0.000    0.024    0.000 {built-in method builtins.isinstance}
64727/64706    0.021    0.000    0.022    0.000 {built-in method builtins.hasattr}
      285    0.017    0.000    0.017    0.000 {built-in method posix.listdir}
     1674    0.017    0.000    0.102    0.000 <frozen importlib._bootstrap_external>:1356(find_spec)
      925    0.017    0.000    0.020    0.000 sre_compile.py:276(_optimize_charset)
     1056    0.016    0.000    0.160    0.000 posixpath.py:400(_joinrealpath)
      151    0.014    0.000    0.014    0.000 {built-in method builtins.compile}
     2336    0.013    0.000    0.021    0.000 posixpath.py:338(normpath)
     1639    0.013    0.000    0.013    0.000 {method 'search' of 're.Pattern' objects}
    25810    0.013    0.000    0.013    0.000 sre_parse.py:233(__next)
 2443/308    0.012    0.000    0.037    0.000 sre_compile.py:71(_compile)
     2675    0.012    0.000    0.019    0.000 pathlib.py:62(parse_parts)
      917    0.011    0.000    0.011    0.000 {method 'read' of '_io.FileIO' objects}
    37081    0.010    0.000    0.010    0.000 {method 'startswith' of 'str' objects}
    85693    0.010    0.000    0.010    0.000 {method 'append' of 'list' objects}
     1038    0.010    0.000    0.202    0.000 <frozen importlib._bootstrap>:882(_find_spec)
     8401    0.010    0.000    0.011    0.000 {built-in method __new__ of type object at 0x10dace030}
      793    0.009    0.000    0.014    0.000 __init__.py:133(__init__)
    12049    0.009    0.000    0.103    0.000 posixpath.py:168(islink)
      903    0.009    0.000    0.207    0.000 <frozen importlib._bootstrap_external>:793(get_code)
      829    0.008    0.000    0.069    0.000 rewrite.py:142(_early_rewrite_bailout)
      518    0.008    0.000    0.014    0.000 __init__.py:398(deconstruct)
 2854/766    0.008    0.000    0.011    0.000 sre_parse.py:174(getwidth)
     9557    0.008    0.000    0.021    0.000 <frozen importlib._bootstrap_external>:56(_path_join)
  5311/96    0.008    0.000    0.018    0.000 copy.py:132(deepcopy)
 1260/326    0.007    0.000    0.963    0.003 <frozen importlib._bootstrap>:978(_find_and_load)
    22199    0.007    0.000    0.018    0.000 sre_parse.py:254(get)
    44480    0.007    0.000    0.008    0.000 {method 'get' of 'dict' objects}
      401    0.007    0.000    0.015    0.000 functional.py:125(__prepare_class__)
18325/18261    0.007    0.000    0.009    0.000 {method 'join' of 'str' objects}
    11516    0.007    0.000    0.007    0.000 {method 'match' of 're.Pattern' objects}
     9557    0.007    0.000    0.010    0.000 <frozen importlib._bootstrap_external>:58(<listcomp>)
    15760    0.007    0.000    0.010    0.000 sre_parse.py:164(__getitem__)
       95    0.007    0.000    0.007    0.000 {built-in method posix.open}
     1816    0.006    0.000    0.015    0.000 <frozen importlib._bootstrap_external>:271(cache_from_source)
      332    0.006    0.000    0.006    0.000 base.py:48(subclass_exception)
     9544    0.006    0.000    0.013    0.000 ast.py:184(iter_child_nodes)
    33897    0.006    0.000    0.006    0.000 {built-in method posix.fspath}
carltongibson commented 5 years ago

Hey @andrewgodwin. Can I ask if you have any thoughts here? (Maybe you’ve seen this kind of thing before?)

I’m a bit like 😳 as to how to advise. Thanks.

andrewgodwin commented 5 years ago

It's worth trying it on an older asgiref version - let's say 3.1.4 - and see if it persists there. I had someone say they might have a potential problem a bit like this with 3.2 but it was un-replicatable.

sbdchd commented 5 years ago

Okay, I tested the following versions of asgiref and they exhibit the TimeoutError:

andrewgodwin commented 5 years ago

OK, then it's not asgiref, which is both good and bad.

Have you added logging to the stub get_user to see if it's getting called at all? Given you have three different consumers running at once, it's almost certainly a deadlock that's occurring, maybe due to the ThreadPoolExecutor being exhausted, but it should have a good 8-10 threads by default (and we have some anti-deadlock tests for that code already).

The best thing to do would be to log every step into and out of the get_user function, and the execution of the lines above and below the await, to work out exactly where it is getting stuck and what's run at that point. Tracebacks tend not to be very accurate when asyncio is involved.

sbdchd commented 5 years ago

Thanks for the pointers.

I've added logging to the tests and the consumer but the logging seems to paint a similar picture as the traceback.

INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!!
INFO     /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm
Full traceback ``` ❯ poetry run pytest -o log_cli=true mysite ========================================= test session starts ========================================= platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0 Django settings: mysite.settings (from ini file) rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini plugins: django-3.5.1, asyncio-0.10.0 collected 1 item mysite/chat/test_consumers.py::test_consumer -------------------------------------------- live log call -------------------------------------------- INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1 INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm FAILED [100%] ============================================== FAILURES =============================================== ____________________________________________ test_consumer ____________________________________________ self = , timeout = 1 async def receive_output(self, timeout=1): """ Receives a single message from the application, with optional timeout. """ # Make sure there's not an exception to raise from the task if self.future.done(): self.future.result() # Wait and receive the message try: async with async_timeout(timeout): > return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/testing.py:75: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available. """ while self.empty(): getter = self._loop.create_future() self._getters.append(getter) try: > await getter E concurrent.futures._base.CancelledError /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError During handling of the above exception, another exception occurred: @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_consumer(): # 1. setup connections communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a")) connected, _ = await communicator.connect() assert connected comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b")) connected, _ = await comm_a.connect() assert connected comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c")) connected, _ = await comm_b.connect() assert connected log.info("about to subscribe") # 3. subscribe to events subscribe = {"foo": "bar"} log.info("sub:comm1") await communicator.send_json_to(subscribe) log.info("sub:comm_a") await comm_a.send_json_to(subscribe) log.info("sub:comm_b") await comm_b.send_json_to(subscribe) # 3. publish events channel_layer = get_channel_layer() log.info("pub") await channel_layer.group_send( GROUP_NAME, { 'type': 'broadcast_message', } ) # 4. assert on received events expected_broadcast_msg = {"broadcast": "important update"} log.info("about to recv comm") > res = await communicator.receive_json_from() mysite/chat/test_consumers.py:68: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from payload = await self.receive_from(timeout) .venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from response = await self.receive_output(timeout) .venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output raise e .venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__ self._do_exit(exc_type) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exc_type = def _do_exit(self, exc_type: Type[BaseException]) -> None: if exc_type is asyncio.CancelledError and self._cancelled: self._cancel_handler = None self._task = None > raise asyncio.TimeoutError E concurrent.futures._base.TimeoutError .venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError ---------------------------------------- Captured stderr setup ---------------------------------------- Creating test database for alias 'default'... ------------------------------------------ Captured log call ------------------------------------------ INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1 INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm -------------------------------------- Captured stderr teardown --------------------------------------- Destroying test database for alias 'default'... ========================================== warnings summary =========================================== mysite/chat/test_consumers.py::test_consumer /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/generic/websocket.py:259: RuntimeWarning: coroutine 'get_user' was never awaited await self.receive_json(await self.decode_json(text_data), **kwargs) -- Docs: https://docs.pytest.org/en/latest/warnings.html ==================================== 1 failed, 1 warnings in 1.71s ==================================== Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Exception ignored in: .new_application at 0x105f84c48> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Exception ignored in: .new_application at 0x105ff9248> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> ```
full trackback with `PYTHONASYNCIODEBUG=1` ``` ❯ PYTHONASYNCIODEBUG=1 poetry run pytest -o log_cli=true mysite ============================================================================================= test session starts ============================================================================================== platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0 Django settings: mysite.settings (from ini file) rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini plugins: django-3.5.1, asyncio-0.10.0 collected 1 item mysite/chat/test_consumers.py::test_consumer ------------------------------------------------------------------------------------------------ live log call ------------------------------------------------------------------------------------------------- INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1 INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm FAILED [100%] =================================================================================================== FAILURES =================================================================================================== ________________________________________________________________________________________________ test_consumer _________________________________________________________________________________________________ self = , timeout = 1 async def receive_output(self, timeout=1): """ Receives a single message from the application, with optional timeout. """ # Make sure there's not an exception to raise from the task if self.future.done(): self.future.result() # Wait and receive the message try: async with async_timeout(timeout): > return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/testing.py:75: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available. """ while self.empty(): getter = self._loop.create_future() self._getters.append(getter) try: > await getter E concurrent.futures._base.CancelledError /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError During handling of the above exception, another exception occurred: @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_consumer(): # 1. setup connections communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a")) connected, _ = await communicator.connect() assert connected comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b")) connected, _ = await comm_a.connect() assert connected comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c")) connected, _ = await comm_b.connect() assert connected log.info("about to subscribe") # 3. subscribe to events subscribe = {"foo": "bar"} log.info("sub:comm1") await communicator.send_json_to(subscribe) log.info("sub:comm_a") await comm_a.send_json_to(subscribe) log.info("sub:comm_b") await comm_b.send_json_to(subscribe) # 3. publish events channel_layer = get_channel_layer() log.info("pub") await channel_layer.group_send( GROUP_NAME, { 'type': 'broadcast_message', } ) # 4. assert on received events expected_broadcast_msg = {"broadcast": "important update"} log.info("about to recv comm") > res = await communicator.receive_json_from() mysite/chat/test_consumers.py:68: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from payload = await self.receive_from(timeout) .venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from response = await self.receive_output(timeout) .venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output raise e .venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__ self._do_exit(exc_type) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = , exc_type = def _do_exit(self, exc_type: Type[BaseException]) -> None: if exc_type is asyncio.CancelledError and self._cancelled: self._cancel_handler = None self._task = None > raise asyncio.TimeoutError E concurrent.futures._base.TimeoutError .venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError -------------------------------------------------------------------------------------------- Captured stderr setup --------------------------------------------------------------------------------------------- Creating test database for alias 'default'... ---------------------------------------------------------------------------------------------- Captured log call ----------------------------------------------------------------------------------------------- INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:43 about to subscribe INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:47 sub:comm1 INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:49 sub:comm_a INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:51 sub:comm_b INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:56 pub INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:26 about to call get_user INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/consumers.py:consumers.py:28 finished calling get_user!!! INFO /Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py:test_consumers.py:67 about to recv comm ------------------------------------------------------------------------------------------- Captured stderr teardown ------------------------------------------------------------------------------------------- Destroying test database for alias 'default'... =============================================================================================== warnings summary =============================================================================================== mysite/chat/test_consumers.py::test_consumer /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/generic/websocket.py:259: RuntimeWarning: coroutine 'get_user' was never awaited await self.receive_json(await self.decode_json(text_data), **kwargs) -- Docs: https://docs.pytest.org/en/latest/warnings.html ======================================================================================== 1 failed, 1 warnings in 1.69s ========================================================================================= Task was destroyed but it is pending! source_traceback: Object created at (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in sys.exit(main()) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main return config.hook.pytest_cmdline_main(config=config) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main return wrap_session(config, _main) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session session.exitstatus = doit(config, session) or 0 File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main config.hook.pytest_runtestloop(session=session) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol runtestprotocol(item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol reports.append(call_and_report(item, "call", log)) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report call = call_runtest_hook(item, when, **kwds) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call result = func() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call item.runtest() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest self.ihook.pytest_pyfunc_call(pyfuncitem=self) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call pyfuncitem.obj(**testargs), loop=event_loop)) File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete self.run_forever() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever self._run_once() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once handle._run() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run self._context.run(self._callback, *self._args) File "/Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py", line 35, in test_consumer comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b")) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/testing/websocket.py", line 26, in __init__ super().__init__(application, self.scope) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py", line 21, in __init__ self.application(scope, self.input_queue.get, self.output_queue.put) task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for= created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py:21> Exception ignored in: .new_application at 0x1058704c8> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! source_traceback: Object created at (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in sys.exit(main()) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main return config.hook.pytest_cmdline_main(config=config) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main return wrap_session(config, _main) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session session.exitstatus = doit(config, session) or 0 File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main config.hook.pytest_runtestloop(session=session) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol runtestprotocol(item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol reports.append(call_and_report(item, "call", log)) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report call = call_runtest_hook(item, when, **kwds) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call result = func() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call item.runtest() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest self.ihook.pytest_pyfunc_call(pyfuncitem=self) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call pyfuncitem.obj(**testargs), loop=event_loop)) File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete self.run_forever() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever self._run_once() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once handle._run() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run self._context.run(self._callback, *self._args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application return await instance(receive, send) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ return await self.inner(receive, self.send) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call await inner_instance(receive, send) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ [receive, self.channel_receive], self.dispatch File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 53, in await_many_dispatch tasks[i] = asyncio.ensure_future(consumer_callables[i]()) task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440] created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py:53> Task was destroyed but it is pending! source_traceback: Object created at (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in sys.exit(main()) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main return config.hook.pytest_cmdline_main(config=config) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main return wrap_session(config, _main) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session session.exitstatus = doit(config, session) or 0 File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main config.hook.pytest_runtestloop(session=session) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol runtestprotocol(item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol reports.append(call_and_report(item, "call", log)) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report call = call_runtest_hook(item, when, **kwds) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call result = func() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call item.runtest() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest self.ihook.pytest_pyfunc_call(pyfuncitem=self) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call pyfuncitem.obj(**testargs), loop=event_loop)) File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete self.run_forever() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever self._run_once() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once handle._run() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run self._context.run(self._callback, *self._args) File "/Users/steve/Downloads/channels_tut/mysite/chat/test_consumers.py", line 39, in test_consumer comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c")) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/testing/websocket.py", line 26, in __init__ super().__init__(application, self.scope) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py", line 21, in __init__ self.application(scope, self.input_queue.get, self.output_queue.put) task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for= created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/testing.py:21> Exception ignored in: .new_application at 0x1059f23c8> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task was destroyed but it is pending! source_traceback: Object created at (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/bin/pytest", line 10, in sys.exit(main()) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/config/__init__.py", line 77, in main return config.hook.pytest_cmdline_main(config=config) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 228, in pytest_cmdline_main return wrap_session(config, _main) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 191, in wrap_session session.exitstatus = doit(config, session) or 0 File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 235, in _main config.hook.pytest_runtestloop(session=session) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/main.py", line 256, in pytest_runtestloop item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 72, in pytest_runtest_protocol runtestprotocol(item, nextitem=nextitem) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 87, in runtestprotocol reports.append(call_and_report(item, "call", log)) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 167, in call_and_report call = call_runtest_hook(item, when, **kwds) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in call_runtest_hook lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 220, in from_call result = func() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 192, in lambda: ihook(item=item, **kwds), when=when, reraise=reraise File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/runner.py", line 117, in pytest_runtest_call item.runtest() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/_pytest/python.py", line 1423, in runtest self.ihook.pytest_pyfunc_call(pyfuncitem=self) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/hooks.py", line 289, in __call__ return self._hookexec(self, self.get_hookimpls(), kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 87, in _hookexec return self._inner_hookexec(hook, methods, kwargs) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/manager.py", line 81, in firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pluggy/callers.py", line 187, in _multicall res = hook_impl.function(*args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/pytest_asyncio/plugin.py", line 158, in pytest_pyfunc_call pyfuncitem.obj(**testargs), loop=event_loop)) File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete self.run_forever() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 539, in run_forever self._run_once() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1767, in _run_once handle._run() File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run self._context.run(self._callback, *self._args) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application return await instance(receive, send) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ return await self.inner(receive, self.send) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call await inner_instance(receive, send) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ [receive, self.channel_receive], self.dispatch File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 53, in await_many_dispatch tasks[i] = asyncio.ensure_future(consumer_callables[i]()) task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440] created at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py:53> ```
andrewgodwin commented 5 years ago

So, I've replicated this enough to work out it's a side effect specific to channels_redis. If I replace the broadcast mechanism with something else, it functions fine.

andrewgodwin commented 5 years ago

OK, so in my digging, this looks like it's related to the channels_redis mechanism which tries to share connections across coroutines in the same process. That code wasn't super amazing before, and something about what sync_to_async is doing is upsetting it and causing the entire tower of cards to fall over.

@sbdchd I know this isn't a very helpful suggestion, but if you can eliminate your use of channels_redis you will stop encountering this problem. I know that's probably not a realistic option, but if you were using it as a bridge until you invested in a real message bus, maybe now is the time.

I don't have the mental space to go digging into the channels_redis connection-sharing code right now, as I barely understood it as I wrote it, and it's honestly a lot of effort to solve the small problem of connection sharing and frankly the whole thing should probably be torn down for something simpler but slightly less efficient if nobody is able to debug issues like this!

carltongibson commented 5 years ago

Hey @andrewgodwin. Thanks for the work. Do you have a reproduce set-up that demonstrates the issue? (Maybe you don't... 😬)

andrewgodwin commented 5 years ago

Yup, I cloned the demo repo above and set it up and it fails consistently.

carltongibson commented 5 years ago

Well that’s easy... 😀

I kind of meant that highlights the exact pain point, but don’t worry. Thank you for looking into it.

carltongibson commented 5 years ago

Okay, I kind of see how it might come up with the channel_redis connection pool. I’ll have a play and see if I can pin it down.

sbdchd commented 5 years ago

I changed the config to use rabbitmq via https://github.com/CJWorkbench/channels_rabbitmq/ however the tests still have issues with timeout errors.

It's possible that I don't have rabbitmq configured properly.

poetry run pytest mysite output and stack trace ``` ❯ poetry run pytest mysite ========================================= test session starts ========================================= platform darwin -- Python 3.7.3, pytest-5.1.1, py-1.8.0, pluggy-0.12.0 Django settings: mysite.settings (from ini file) rootdir: /Users/steve/Downloads/channels_tut, inifile: tox.ini plugins: django-3.5.1, asyncio-0.10.0 collected 1 item mysite/chat/test_consumers.py F [100%] ============================================== FAILURES =============================================== ____________________________________________ test_consumer ____________________________________________ self = , timeout = 1 async def receive_output(self, timeout=1): """ Receives a single message from the application, with optional timeout. """ # Make sure there's not an exception to raise from the task if self.future.done(): self.future.result() # Wait and receive the message try: async with async_timeout(timeout): > return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/testing.py:75: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available. """ while self.empty(): getter = self._loop.create_future() self._getters.append(getter) try: > await getter E concurrent.futures._base.CancelledError /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py:159: CancelledError During handling of the above exception, another exception occurred: @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_consumer(): # 1. setup connections communicator = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="a")) connected, _ = await communicator.connect() assert connected comm_a = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="b")) connected, _ = await comm_a.connect() assert connected comm_b = WebsocketCommunicator(application, "/ws/", headers=await get_headers(name="c")) connected, _ = await comm_b.connect() assert connected # 3. subscribe to events subscribe = {"foo": "bar"} await communicator.send_json_to(subscribe) await comm_a.send_json_to(subscribe) await comm_b.send_json_to(subscribe) # 3. publish events channel_layer = get_channel_layer() await channel_layer.group_send( GROUP_NAME, { 'type': 'broadcast_message', } ) # 4. assert on received events expected_broadcast_msg = {"broadcast": "important update"} > res = await communicator.receive_json_from() mysite/chat/test_consumers.py:56: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .venv/lib/python3.7/site-packages/channels/testing/websocket.py:93: in receive_json_from payload = await self.receive_from(timeout) .venv/lib/python3.7/site-packages/channels/testing/websocket.py:72: in receive_from response = await self.receive_output(timeout) .venv/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output raise e .venv/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output return await self.output_queue.get() .venv/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__ self._do_exit(exc_type) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exc_type = def _do_exit(self, exc_type: Type[BaseException]) -> None: if exc_type is asyncio.CancelledError and self._cancelled: self._cancel_handler = None self._task = None > raise asyncio.TimeoutError E concurrent.futures._base.TimeoutError .venv/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError ------------------------------------------ Captured log call ------------------------------------------ WARNING aioamqp.protocol:protocol.py:190 only PLAIN login_method is supported, falling back to AMQPLAIN ========================================== 1 failed in 1.64s ========================================== Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Task was destroyed but it is pending! task: .new_application() running at /Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py:34> wait_for=> Exception ignored in: .new_application at 0x105d31ac8> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task exception was never retrieved future: exception=ChannelClosed(0, 'Channel is closed')> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/core.py", line 162, in receive return await connection.receive(channel) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 780, in receive return await self._incoming_messages.get(asgi_channel) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 283, in get item = await self._out[asgi_channel].get() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 126, in get await getter # raises ChannelClosed aioamqp.exceptions.ChannelClosed: (0, 'Channel is closed') Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> Exception ignored in: .new_application at 0x105ee72c8> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/asgiref/compatibility.py", line 34, in new_application File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__ File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels/utils.py", line 57, in await_many_dispatch File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 688, in call_soon File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 480, in _check_closed RuntimeError: Event loop is closed Task exception was never retrieved future: exception=ChannelClosed(0, 'Channel is closed')> Traceback (most recent call last): File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/core.py", line 162, in receive return await connection.receive(channel) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 780, in receive return await self._incoming_messages.get(asgi_channel) File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 283, in get item = await self._out[asgi_channel].get() File "/Users/steve/Downloads/channels_tut/.venv/lib/python3.7/site-packages/channels_rabbitmq/connection.py", line 126, in get await getter # raises ChannelClosed aioamqp.exceptions.ChannelClosed: (0, 'Channel is closed') Task was destroyed but it is pending! task: wait_for= cb=[_wait.._on_completion() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:440]> ```

edit: I pushed my changes to the rabbitmq-layer branch on https://github.com/sbdchd/django-channels-bug-repo

andrewgodwin commented 5 years ago

That channel layer also uses a very complicated asyncio-based setup to share connections from a single process. Ideally asgiref would not disrupt whatever mechanism they're both using to work, but it needs to be pinned down to the exact failing call first before I can work out if I can make things compatible, unfortunately.

ghost commented 4 years ago

I can comfirm use InMemoryChannelLayer also cause the problem.

kirankumbhar commented 2 years ago

I'm also facing this issue while testing, And I'm not using channels_redis in consumer.

I have written custom auth middleware and wrapped it up around URLRouter. My middleware is querying database to validate the token using @database_sync_to_async. Which is triggering TimeoutError

Middleware:


@database_sync_to_async
def get_user(token: bytes):
    #validate token and return user or return AnonymousUser
     pass

class WebSocketAuthMiddleware:
    def __init__(self, app):
        # Store the ASGI application we were passed
        self.app = app

    def get_headers(self, scope) -> Dict[bytes, bytes]:
        return {h[0]: h[1] for h in scope.get("headers", ())}

    async def __call__(self, scope, receive, send):
        headers = self.get_headers(scope)
        scope["user"] = await get_user(headers.get(b"token", b""))

Asgi:

application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": WebSocketAuthMiddleware(URLRouter(websocket_urlpatterns)),
    }
)

Let me know if you require a sample project to test this out. I'll upload it to Github and link it here.

libraries and versions: channels: 3.0.4 django: 4.0.1 pytest: 6.2.5 pytest-django: 4.5.2 pytest-asycnio: 0.17.2

Tested on: Mac OS Catalina 10.15.7 with python 3.8.6