django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.08k stars 800 forks source link

Failing test when calling django orm code wrapped in database_sync_to_async #1091

Closed tomek-rej closed 2 months ago

tomek-rej commented 6 years ago

I'm trying to test my Channels consumer which calls database_sync_to_async code. The consumer looks something like this:

class MyConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        my_obj = await self.get_obj()
        ...other code

    @database_sync_to_async
    def get_obj(self):
        return MyModel.objects.get(filter_condition)

The test is using the @pytest.mark.asyncio and @pytest.mark.django_db decorators ie:

@pytest.mark.asyncio
@pytest.mark.django_db
async def test_heartbeat():
    communicator = WebsocketCommunicator(MyConsumer, '<path>')
    await communicator.connect()
    await communicator.disconnect()

I'm using the following command to run the test:

./manage.py test xxx/tests.py::test_heartbeat

The test itself passes, however at the end of the test run I always get the following error:

=============================================== ERRORS ===============================================
________________________________ ERROR at teardown of test_heartbeat _________________________________

self = <django.db.backends.utils.CursorWrapper object at 0x7fbc7b8d80b8>, sql = 'DROP DATABASE "test"'
params = None
ignored_wrapper_args = (False, {'connection': <django.db.backends.postgresql.base.DatabaseWrapper object at 0x7fbc7b0481d0>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fbc7b8d80b8>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
>               return self.cursor.execute(sql)
E               psycopg2.OperationalError: database "test" is being accessed by other users
E               DETAIL:  There is 1 other session using the database.

I can make the test failure go away by removing all references to database_sync_to_async in the consumer, but my understanding is that is poor practice to have sync code (like calling django orm) running inside an async function.

Strangely when I get the failing test two tests run (one pass and one fail), but when I remove the references to database_sync_to_async only one test runs.

Here are the versions of my libraries:

django==2.0.6
daphne==2.2.0
asgiref==2.3.2
channels-redis==2.2.1
pytest==3.6.1
pytest-asyncio==0.8.0
pytest-django==3.1.2
andrewgodwin commented 6 years ago

I'm guessing that the pytest.mark.django_db decorator from pytest-django is probably interacting with Channels here. What happens if you remove that?

tomek-rej commented 6 years ago

I removed the pytest.mark.django_db decorator and removed all calls to the django orm and the test passed. Then I brought back the decorator, but didn't reinstate any calls to the orm. The test still passed. Finally I brought back the call to the orm, and the test failed.

So the tests can pass if the decorator is there, but only if no django orm calls are made.

Also if I remove the pytest.mark.django_db and try to call the orm I get the error:

E       Failed: Database access not allowed, use the "django_db" mark, or the "db" or "transactional_db" fixtures to enable it.

so I just assumed decorator needs to be there if I want my tests to make any orm calls.

andrewgodwin commented 6 years ago

Hm. Then yes, this is a bad interaction between pytest-django and Channels, as they both do thing to try and make the ORM function properly outside of a request. Try just the db mark, that might do less?

tomek-rej commented 6 years ago

If I used the pytest.mark.db instead of pytest.mark.django_db I got this error:

E       Failed: Database access not allowed, use the "django_db" mark, or the "db" or "transactional_db" fixtures to enable it.

So instead I tried to use the db fixture as the alternative as suggested in the error. Here is some minimal code which still fails:

from channels.db import database_sync_to_async
import pytest
from accounts.models import User

@database_sync_to_async
def get_user():
    return User.objects.first()

@pytest.mark.asyncio
@pytest.fixture()
async def test_x(db):
    await get_user()

I get the same error as before:

E               django.db.utils.OperationalError: database "test" is being accessed by other users
E               DETAIL:  There is 1 other session using the database.

Now if I remove the call to @database_sync_to_async the test passes:

from channels.db import database_sync_to_async
import pytest
from accounts.models import User

@pytest.mark.asyncio
@pytest.fixture()
async def test_x(db):
    user = User.objects.first()
tomek-rej commented 6 years ago

The good news is I think I found a way to have the test pass. Until now I was just running that individual test case, but I noticed that if I run the whole test suite (which had other tests other than this one) the test passes. So I think this issue only arises if this type of test is the last one to run (or is the only one to run).

andrewgodwin commented 6 years ago

Hm, this is going to need some more investigation by someone to figure out if we can make the two cooperate. For now I'd recommend trying to avoid these kinds of tests or figuring out a workaround.

vijayshan commented 6 years ago

@tomek-rej Have you tried setting the transaction parameter on the pytest.mark.django_db decorator to true?

This is a sample test: `@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_my_consumer_normal():

user = User.objects.get(username="vijayshan")
assert user is not None
user1 = User.objects.get(username="shan")
assert user1 is not None

communicator = create_communicator_for_url_and_user("/ws/chat/room1/vijayshan/", user)
communicator1 = create_communicator_for_url_and_user( "/ws/chat/room1/vijayshan1/", user1)
connected, subprotocol = await communicator.connect()
connected1, subprotocol = await communicator1.connect()
assert connected is True
assert connected1 is True
room = Room.objects.get(name="room1")
assert room is not None
# Test sending text
message_sent = "hello"
await communicator.send_to(text_data=json.dumps({"message": message_sent}))
await confirm_response_for_communicator(communicator, "vijayshan: hello")
await confirm_response_for_communicator(communicator1,"vijayshan: hello")

msg = Message.objects.get(message=message_sent)
assert msg.special_message == False
assert msg.message == message_sent
assert msg.room_id == room.id
assert msg.user_id == user.id

await communicator.disconnect()
await communicator1.disconnect()`

and this is the consumer: `class ChatConsumer(AsyncWebsocketConsumer): """ ChatConsumer: The chat consumer class is responsible for all chat interactions """

async def connect(self):
    """
    the method that handles the initial connections. the method captures the initial room name
    and generates a room group name. if the scope has a user name then the connection is accepted
    else it is rejected. a unique id for the user is also generated.
    :return:
    """
    self.requires_history = True
    self.room_name = self.scope['url_route']['kwargs']['room_name']

    self.room_group_name = 'chat_%s' % self.room_name

    if("user" in self.scope.keys() and self.scope['user'] is not None):
        self.user_name = self.scope['user'].username

        room, created = await self.retrieve_room_name(self.room_name)
        self.room = room
        self.user = self.scope['user']
        self._id = str(uuid.uuid4())
        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()
    else:
        await self.close()

@database_sync_to_async
def retrieve_room_name(self, room_name):
    """
    This method retrieves the room name name from the db if it exists else creates one
    :param room_name: the room name that has been requested
    :return: the room name from the db and if it has been created or did it already exist
    """
    room, created = Room.objects.get_or_create(name=room_name)
    return room, created

def add_message(self, message, is_special=False):
    """
    adds a message to the database
    :param message: the message text to be added
    :param is_special: is this is a special message or not
    :return: return the message object that was just added
    """
    mess = Message(message=message, room=self.room, user=self.user, special_message=is_special)
    mess.save()
    return mess

async def disconnect(self, close_code):
    """
    Leave the room
    :param close_code:
    :return:
    """
    await self.channel_layer.group_discard(
        self.room_group_name,
        self.channel_name
    )

# Receive message from WebSocket

async def receive(self, text_data=None, bytes_data=None):
    text_data_json = json.loads(text_data)
    message = text_data_json["message"]
    if "special_message" in message:
        # Send message to room group

        await self.handle_special_message(message)
    else:
        await self.handle_regular_message(message)

async def handle_regular_message(self, message):
    message_obj = await database_sync_to_async(self.add_message)(
        message=message
    )
    message = "{}: {}".format(self.user_name, message_obj.message)
    # Send message to room group
    await self.channel_layer.group_send(
        self.room_group_name,
        {
            'type': 'chat_message',
            'message': message,
            'is_special': False
        }
    )

async def handle_special_message(self, message):
    message_obj = await database_sync_to_async(self.add_message)(
        message=message.replace("special_message ", ""),
        is_special=True
    )
    message = "{}: {}".format(self.user_name, message_obj.message)
    await self.channel_layer.group_send(
        self.room_group_name,
        {
            'type': 'special_message',
            'message': message,
            'id': self._id,
            'is_special': True
        }
    )

# Receive message from room group
async def chat_message(self, event):
    message = event['message']
    # Send message to WebSocket
    await self.send(text_data=json.dumps({
        'type': 'chat_message',
        'message': message,
    }))

async def special_message(self,event):
    message = event['message']
    id = event['id']
    # Send message to socket
    if self._id != id:
        await self.send(text_data=json.dumps({
            'type': 'chat_message',
            'message': message,
        }))

` My tests are passing.

tomek-rej commented 6 years ago

@vijayshan thanks for your feedback. I did try to use transaction=True in my pytest.mark.django_db, but didn't seem to help. However I did not try to run your example code exactly, so it's possible there's something specific to my database that's causing those errors.

One thing I noticed was with just a single test case, it passes but as soon as you add more test cases there's a bigger chance some connections won't be closed properly resulting in the error message from my original post. Also in case it helps there's another issue someone created in the pytest-asyncio github that exactly describes my problem: https://github.com/pytest-dev/pytest-asyncio/issues/82. Unfortunately no solution there either.

Anyway in my case I came up with a really hacky fix. It involves calling raw sql to close those connections just before the last test ends. I'm using postgres so my code looks like:

def close_db_connections():
    sql = "select * from pg_stat_activity where datname = 'test'"
    with connection.cursor() as cursor:
        cursor.execute(sql)
        for row in cursor.fetchall()[0:-1]:
            cursor.execute(
                'select pg_terminate_backend({}) from pg_stat_activity'.format(
                    row[2]))

Kind of ugly, I know, but at least now my tests are passing all the time.

adamhooper commented 5 years ago

Here's my ugly hack:

    def setUp(self):
        super().setUp()

        # We'll execute with a 1-worker thread pool. That's because Django
        # database methods will spin up new connections and never close them.
        # (@database_sync_to_async -- which execute uses --only closes _old_
        # connections, not valid ones.)
        #
        # This hack is just for unit tests: we need to close all connections
        # before the test ends, so we can delete the entire database when tests
        # finish. We'll schedule the "close-connection" operation on the same
        # thread as @database_sync_to_async's blocking code ran on. That way,
        # it'll close the connection @database_sync_to_async was using.
        self._old_loop = asyncio.get_event_loop()
        self.loop = asyncio.new_event_loop()
        self.loop.set_default_executor(ThreadPoolExecutor(1))
        asyncio.set_event_loop(self.loop)

    # Be careful, in these tests, not to run database queries in async blocks.
    def tearDown(self):
        def close_thread_connection():
            # Close the connection that was created by @database_sync_to_async.
            # Assumes we're running in the same thread that ran the database
            # stuff.
            django.db.connections.close_all()

        self.loop.run_in_executor(None, close_thread_connection)

        asyncio.set_event_loop(self._old_loop)

        super().tearDown()

... and then instead of async_to_async(func)(params), I run self.loop.run_until_complete(func(params)).

This sort of test case should be the exception, not the norm. You can usually structure your code so that each single test either tests scheduling or database queries but not both.

jpic commented 5 years ago

Is there a related issue on the pytest-django repo ?

adamhooper commented 5 years ago

Out of curiosity: I see that nobody here has mentioned CONN_MAX_AGE. It occurs to me that if one sets settings.DATABASES['default']['CONN_MAX_AGE']=0, the error in this bug should never happen (because @database_sync_to_async should close its connection each time it's called). CONN_MAX_AGE=0 may be a viable approach for some people.

andrewgodwin commented 5 years ago

Looks like this got duped into #1234, but I think I may have fixed it in the most recent commit. Testing of that would be welcome.

adamhooper commented 5 years ago

@andrewgodwin No, this is a separate issue. The problem here is how to shut down all database connections -- not just expired ones. Django tries to delete the database at the end of a test run, but Postgres refuses if there are still connections open.

andrewgodwin commented 5 years ago

Oh, right, that old biscuit. Then I guess we'll have to leave this open.

carltongibson commented 5 years ago

@adamhooper Are you able to put together a minimal project that reproduces this? (It would make coming up with a fix a lot quicker if you do. 🙂)

blueyed commented 5 years ago

Just ran into this myself. pytest-django calls teardown_databases, which triggers this then, because of an open connection on another thread (via @database_sync_to_async).

  1. using conn_max_age=0 works around it
  2. closing all connections in the @database_sync_to_async wrapped function also works around it

Since this only affects tests when not re-using the DB (--reuse-db in pytest-django), this is not really critical - especially given that --reuse-db (or keepdb in Django) is faster anyway.

blueyed commented 5 years ago

@carltongibson Minimal is not easy with this kind of issues, but given the details here it is pretty clear: there is an open connection in another thread (caused by channels using threads).

blueyed commented 5 years ago

As for --reuse-db / keepdb: it seems like objects created via the threaded method are also persisted when using non-transactional DB, i.e. the outer atomic block gets rolled back at the end of a test is not included here. This basically requires to use transactional_db with pytest-django then for those tests.

Without much investigation it seems like channels needs to hook into Django's test setup here better, probably "just" by wrapping connections in threads to ensure they are getting closed in the end, and also to wrap them in atomic blocks (based on the test method being used).

adamhooper commented 5 years ago

I have transitioned our project to a different approach for unit tests: use the main-thread database connection. This makes tests faster since they don't need to disconnect/reconnect (assuming you've configured Django to persist connections -- which is a valid assumption, because if you didn't you wouldn't be seeing this error, right?).

This means ... er ... replacing @database_sync_to_async. I have no compunction about doing this: I don't believe database connections should run on the default thread pool. (This lets you configure the number of database connections your Django app maintains. That's a good thing.)

In cjworkbench/sync.py, we have:

import asyncio
from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import functools
from channels.db import DatabaseSyncToAsync
from django.conf import settings

class OurDatabaseSyncToAsync(DatabaseSyncToAsync):
    """
    SyncToAsync on a special, database-only threadpool.
    Each thread has zero (on startup) or one (forever) database connection,
    stored in thread-local `django.db.connections[DEFAULT_DB_ALIAS]`.

    There is no way to close the threads' connections.

    This is how Channels' database_sync_to_async _should_ be implemented.
    We don't want ASGI_THREADS-many database connections because they thrash
    the database. Fewer connections means higher throughput. (We don't have any
    long-living SQL transactions; they'd change this calculus.)
    """

    executor = ThreadPoolExecutor(
        max_workers=settings.N_SYNC_DATABASE_CONNECTIONS,
        thread_name_prefix='our-database-sync-to-async-',
    )

    # override
    async def __call__(self, *args, **kwargs):
        # re-implementation of async_to_sync
        loop = asyncio.get_event_loop()
        context = contextvars.copy_context()
        child = functools.partial(self.func, *args, **kwargs)

        future = loop.run_in_executor(
            self.executor,
            functools.partial(
                self.thread_handler,
                loop,
                self.get_current_task(),
                context.run,
                child
            ),
        )
        return await asyncio.wait_for(future, timeout=None)

# The class is TitleCased, but we want to encourage use as a callable/decorator
database_sync_to_async = OurDatabaseSyncToAsync

In our tests/utils.py file, we have:

# Connect to the database, on the main thread, and remember that connection
main_thread_connections = {name: connections[name] for name in connections}

def _inherit_main_thread_connections():
    for name in main_thread_connections:
        connections[name] = main_thread_connections[name]
        connections[name].allow_thread_sharing = True

class DbTestCase(SimpleTestCase):
    allow_database_queries = True

    # run_with_async_db() tasks all share a single database connection. To
    # avoid concurrency issues, run them all in a single thread.
    #
    # Assumes DB connections may be passed between threads. (Only one thread
    # will make DB calls at a time.)
    async_executor = ThreadPoolExecutor(
        max_workers=1,
        thread_name_prefix='run_with_async_db_thread',
        initializer=_inherit_main_thread_connections
    )

    # Don't bother clearing data in tearDown(). The next test that needs the
    # database will be running setUp() anyway, so extra clearing will only cost
    # time.

    def run_with_async_db(self, task):
        """
        Runs async tasks, using the main thread's database connection.
        See
        https://github.com/django/channels/issues/1091#issuecomment-436067763.
        """
        # We'll execute with a 1-worker thread pool, shared between tests. We
        # need to limit to 1 worker, because all workers share the same
        # database connection.
        #
        # This hack is just for unit tests: the test suite will end with a
        # "delete the entire database" call, and we want it to succeed; that
        # means there need to be no other connections using the database.
        old_loop = asyncio.get_event_loop()
        old_executor = OurDatabaseSyncToAsync.executor
        asyncio.set_event_loop(None)
        try:
            OurDatabaseSyncToAsync.executor = self.async_executor
            return asyncio.run(task)
        finally:
            OurDatabaseSyncToAsync.executor = old_executor
            asyncio.set_event_loop(old_loop)

Now:

In my opinion, the separate-thread-pool approach should be part of Channels proper. I'm less certain about how Channels proper should address unit testing.

blueyed commented 5 years ago

@adamhooper Thanks for sharing.

Two remarks:

1.

connections[name].allow_thread_sharing = True

This does not work in Django 2.2 anymore. connections[name].inc_thread_sharing() should be used instead probably?

  1. I am seeing an error with asyncio.run(task) in run_with_async_db:
    tests/test_app.py:167: in test_sync_db
    await run_with_async_db(create_request_obj())
    tests/conftest.py:309: in inner
    return asyncio.run(task)
    /usr/lib/python3.7/asyncio/runners.py:34: in run
    "asyncio.run() cannot be called from a running event loop")
    E   RuntimeError: asyncio.run() cannot be called from a running event loop

    asyncio.get_event_loop() still returns the old/current loop after asyncio.set_event_loop(None) for me. Using Python 3.7.

I've got it working based on pytest / pytest-asyncio, but changing/adjusting the event_loop fixture provided by it, but then ran into channels closing the connections as being old, because _enter_atomics sets autocommit to False, and this then does not match the settings dict (which close_old_connections compares it with.

  File "…/Vcs/pytest-django/pytest_django/fixtures.py", line 152, in _django_db_fixture_helper
    test_case._pre_setup()
  File "…/Vcs/django/django/test/testcases.py", line 938, in _pre_setup
    self._fixture_setup()
  File "…/Vcs/django/django/test/testcases.py", line 1169, in _fixture_setup
    self.atomics = self._enter_atomics()
  File "…/Vcs/django/django/test/testcases.py", line 1107, in _enter_atomics
    atomics[db_name].__enter__()
  File "…/Vcs/django/django/db/transaction.py", line 201, in __enter__
    connection.set_autocommit(False, force_begin_transaction_with_broken_autocommit=True)

To summarize: Your OurDatabaseSyncToAsync is mainly there to use its executor (where workers are controlled through a setting), and which allows to override it for tests, where it then copies the db connections from the main thread. I think that should be improved in asgiref, to use self.executor there (https://github.com/django/asgiref/pull/88).

The following works for me:

sync.py (note that it uses SyncToAsync to not close legitimate connections (in atomic mode)):

from concurrent.futures import ThreadPoolExecutor
from channels.db import SyncToAsync
# from channels.db import DatabaseSyncToAsync
from django.conf import settings

class OurDatabaseSyncToAsync(SyncToAsync):
    executor = ThreadPoolExecutor(
        max_workers=settings.N_SYNC_DATABASE_CONNECTIONS,
        thread_name_prefix='our-database-sync-to-async-',
    )

database_sync_to_async = OurDatabaseSyncToAsync

conftest.py:

@pytest.yield_fixture(scope="session")
def event_loop(async_db_executor):
    from project.sync import OurDatabaseSyncToAsync
    import asyncio

    loop = asyncio.get_event_loop()

    OurDatabaseSyncToAsync.executor = async_db_executor

    yield loop

    loop.close()

@pytest.fixture(scope="session")
def async_db_executor():
    from django.db import connections
    from concurrent.futures import ThreadPoolExecutor

    # Connect to the database, on the main thread, and remember that connection.
    main_thread_connections = {name: connections[name] for name in connections}

    def _inherit_main_thread_connections():
        for name in main_thread_connections:
            connections[name] = main_thread_connections[name]
            # connections[name].allow_thread_sharing = True
            connections[name].inc_thread_sharing()

    async_executor = ThreadPoolExecutor(
        max_workers=1,
        thread_name_prefix='run_with_async_db_thread',
        initializer=_inherit_main_thread_connections
    )
    return async_executor

test.py:

@pytest.mark.asyncio
async def test_sync_db(db, run_with_async_db):
    from project.sync import database_sync_to_async

    @database_sync_to_async
    def create_obj():
        models.Model.objects.create()

    await create_obj()
    await create_obj()

With regard to close_if_unusable_or_obsolete / close_old_connections I wonder why you are not seeing this: you are not using TransactionTestCase, are you? (but then also not TestCase, which uses the atomic block)

I think for tests it is fine in general to skip closing of old connections, and if not, it should probably filter out/skip those in atomic blocks (which might be good to do in close_if_unusable_or_obsolete already). Created https://code.djangoproject.com/ticket/30448 about it.

blueyed commented 5 years ago

Seems like all of this can be done in DatabaseSyncToAsync: https://github.com/django/channels/pull/1290/files

blueyed commented 5 years ago

Updated https://github.com/django/channels/pull/1290 - waiting for some feedback now.. :)

adamhooper commented 5 years ago

@blueyed Hi :)

Re inc_thread_sharing(): Interesting. We haven't jumped to Django 2 yet; but evidently there's a disparity here. If Channels is to incorporate this idea, it'll either need to implement both methods or discontinue Django 1.11 LTS support (meaning major version bump).

Re error with asyncio.run(): there seems to be some confusion here. That error message is exactly correct: asyncio.run() cannot be called from a running event loop. My run_with_async_db() calls asyncio.run(); I suspect your code under test also calls asyncio.run(), and you're calling that from within run_with_async_db().

Re executors: my opinion about putting database connections on a separate thread from other sync_to_async() code is probably specific to our project. We use Channels in our workers, in which sync (outside of @database_sync_to_async()) never accesses the database. In that context, we can actually control the number of database connections -- which is important because one database connection per thread is way too many. I think this is important, but I don't see how to deliver it to django-channels more widely, since request handlers run on the default executor and can access the Django database connections. (If only Django were async all the way through ... sigh.)

adamhooper commented 5 years ago

As for why we don't run into _enter_atomics() issues: we don't run into it because TransactionTestCase is awful and we never use it. (I have strong opinions ;).)

We just DELETE all data from all tables at the start of every test. SimpleTestCase+DELETE FROM is literally faster than TransactionTestCase's ROLLBACK on Postgres. (It's faster than TRUNCATE, too.) SimpleTestCase lets us test COMMIT (which can often throw an error!); and it lets us test concurrency issues.

I don't think anybody should ever use TransactionTestCase.

(I learned the same lesson in Rails, which has an equally awful default.)

blueyed commented 5 years ago

Re error with asyncio.run(): there seems to be some confusion here. That error message is exactly correct: asyncio.run() cannot be called from a running event loop. My run_with_async_db() calls asyncio.run(); I suspect your code under test also calls asyncio.run(), and you're calling that from within run_with_async_db().

I see. I was using an async def test in the first place, via pytest-asyncio, so there is an event loop already then.

As for why we don't run into _enter_atomics() issues: we don't run into it because TransactionTestCase is awful and we never use it. (I have strong opinions ;).)

TransactionTestCase does not use it, but TestCase does - it uses atomic blocks for faster rollback.

I don't think anybody should ever use TransactionTestCase.

It is not a good default, but is required for when you need to test transactions yourself (and the test should not be wrapped in an atomic block).

adamhooper commented 5 years ago

I don't think anybody should ever use TransactionTestCase.

It is not a good default, but is required for when you need to test transactions yourself (and the test should not be wrapped in an atomic block).

Indeed -- I got things backwards. TransactionTestCase is the one that doesn't start a transaction; TestCase is the one that does.

I never use either ;).

blueyed commented 5 years ago

For reference: created https://github.com/django/django/pull/11769 to fix close_if_unusable_or_obsolete.

cybergrind commented 4 years ago

I don't suggest this as a final fix. But we were able to bypass this on django 3.1 + channels 2.4 with such fixture:

@pytest.fixture
def fix_async_db():
    local = connections._connections
    ctx = local._get_context_id()
    for conn in connections.all():
        conn.inc_thread_sharing()
    conn = connections.all()[0]
    old = local._get_context_id
    try:
        with mock.patch.object(conn, 'close'):
            object.__setattr__(local, '_get_context_id', lambda: ctx)
            yield
    finally:
        object.__setattr__(local, '_get_context_id', old)

It is specific for asgi and channels implementation, not threadsafe, not asyncio safe

rhertzog commented 2 years ago

We're in 2022 and using database_sync_to_async() in TestCase still breaks the tests.

The outcome of this discussion is not clear... can we do something as part of channels? (cf #1290)

Or is this to be fixed only on the Django side as suggested earlier and as requested in https://code.djangoproject.com/ticket/30448 ?

carltongibson commented 2 years ago

Hey @rhertzog — We're in 2022… — TBH if you're wanting to bootstrap this discussion into more action immediately, you're going to need to summarise and suggest a way forward, after ≈2 years of inactivity.

I have a backlog of issues I want to address: happy to consider input here, but it needs to be input.

I hope that makes sense.

cybergrind commented 2 years ago

My 2 cents: we've put a lot of effort into adopting channels, testing them, and eventually it caused more issues than solve.

And it is not limited just to testing (which you effectively can hack), a model with deferring work to ThreadPool is error-prone and cause deadlocks in production when you have limited CPU + lot of parallel nested async => sync => async => sync calls, which you're going to have in the end. Also, there were smaller issues. And all this knowledge is super-specific to channels and its implementation.

We wanted to have the uniform codebase, but it wasn't that uniform. Testing = PAIN. Different numbers of CPU locally and in production = possibility of magical deadlocks, you weren't seen before.

P.S. I don't insist on not fixing this, but probably this will prevent some over-enthusiastic people from using it in production applications as part of the core architecture. It is totally fine on the small and non-critical scope.

FyZzyss commented 2 years ago

@cybergrind Thank you. This garbage is still broken, so this fix is salvation.

adamchainz commented 1 year ago

I’ve reproduced this problem in a more minimal form with this test repo: https://github.com/adamchainz/channels-bug-connection-closed

The issue only appeared in the client project I was working on when upgrading to asgiref 3.5.1+, which fixed thread-sensitive mode. This fix causes queries from a tested consumer to run on the main thread inside the atomic() from the TestCase.

As a workaround I've added this to the project's test runner class to disable the calls to close_old_connections() in @database_sync_to_async:

        # Channels upstream has a bug where connections get closed during test
        # transactions:
        # https://github.com/django/channels/issues/1091
        # Workaround this by replacing the “maybe close connection” function
        # in channels.db with a no-op
        from channels import db as channels_db

        def no_op():
            pass

        channels_db.close_old_connections = no_op

This issue actually seems to be a repeat of #462. Historically that was fixed in 9ae27cb835763075a81ba3823d20bd68f3ace46b, which made Channels disable its connection-closing logic during tests. Unfortunately this fix seems to have been lost when moving from the testing Client to the newer testing communicators (Client removed in 66bc2981f1c69f35aed9815e5b90d19d5c66387d).

I think a fix should look something like the previous methodolgy, removing the calls to close_old_connections() during tests.

pcraciunoiu commented 5 months ago

The fix in comment https://github.com/django/channels/issues/1091#issuecomment-701361358 has been very helpful, but it no longer works as of asgiref 3.8, as _get_context_id was removed - https://github.com/django/asgiref/compare/3.7.2...3.8.0

I have not spent time trying to find a different solution, so for now I pinned asgiref to 3.8. However, this downgraded the strawberry-graphql-django package we're using for the API, so it's not going to be good long-term.

Unfortunately, without the fix, the unit tests just hang. And the alternative fix to mark each test as transactional causes unit tests to be very slow - https://github.com/pytest-dev/pytest-asyncio/issues/226#issuecomment-1574102150

Has anyone found a better workaround? Is there a better place to follow up on this issue?

bigfootjon commented 4 months ago

I've put up https://github.com/django/channels/pull/2101 which should fix this issue. I would appreciate it if anyone from this thread can apply these changes locally and see if it fixes the problem.

You can install with pip like so:

pip install git+ssh://git@github.com/bigfootjon/channels.git@tests-no-close-connections
sevdog commented 3 months ago

See also https://code.djangoproject.com/ticket/32409, the problem may be related on how the base sync_to_async handles threads and newer versions of asgiref and django have changed this since this issue was created.