django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.08k stars 800 forks source link

How to test sync WebsocketConsumer that queries the database #2043

Open jlariza opened 1 year ago

jlariza commented 1 year ago

Good day,

I have a WebsocketConsumer that queries the databased based on a parameter. The code works as expected; however, tests keep failing with this message django.db.utils.OperationalError: the connection is closed.

This is the consumer implementation based on the tutorial

class MyModelConsumer(WebsocketConsumer):
    def connect(self):
        self.object_id = self.scope["url_route"]["kwargs"]["object_id"]
        self.room_group_name = f"object_{self.object_id}"

        # Join room group
        async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
        if MyModel.objects.filter(id=self.object_id).exists():
            self.accept()
            MyModel = MyModel.objects.get(id=self.object_id)
            serializer = MyModelSerializer(MyModel)
            # Send message to room group
            async_to_sync(self.channel_layer.group_send)(
                self.room_group_name, {"type": "MyModel", "MyModel": serializer.data}
            )
        else:
            self.close()

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)(self.room_group_name, self.channel_name)

and this is the test I'm writing:

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
class TestMyModelConsumer(TestCase):
    """
    Test class that holds the test cases for the MyModelConsumer class
    """
    @database_sync_to_async
    def create_instance(self):
        return MyModel.objects.create()

    # based on https://channels.readthedocs.io/en/latest/topics/testing.html#websocketcommunicator
    async def test_my_consumer(self):
        instance = await self.create_instance()
        application = URLRouter([
            path("test/<object_id>/", MyModelConsumer.as_asgi()),
        ])
        communicator = WebsocketCommunicator(application, f"test/{instance.id}/")
        connected, subprotocol = await communicator.connect()
        self.assertTrue(connected)

However, it always raises django.db.utils.OperationalError: the connection is closed in this line MyModel.objects.create(). It tries to access the database but cannot do it because the database connection is closed.

I tried creating the model instance synchronously in setUp method; it works but then the OperationalError is raised in this line if MyModel.objects.filter(id=self.object_id).exists():.

Any ideas of what I'm missing or doing incorrectly?

Thank you,

jlariza commented 1 year ago

I created this repository to reproduce the issue https://github.com/jlariza/channels_testing_bug using cookiecutter

To test:

The consumer works; I already tested it in another project. However, the tests fail because they cannot handle the async connection to the database.

Any ideas of what I'm missing or doing incorrectly?

Thank you,

carltongibson commented 1 year ago

I suspect this is about how pytest manages the connection between tests.

I'd try inclining the model creation in the test case, to see if that makes progress and then adjust out from there.

jlariza commented 1 year ago

@carltongibson no luck. If you try to run the model creation in the test case directly, it raises django.core.exceptions.SynchronousOnlyOperation but if you use the database_sync_to_asyncdecorator, the OperationalError is raised again.

I tried creating the model using acreate in the testcase but then the OperationalError is raised in this line if MyModel.objects.filter(id=self.object_id).exists(): again.

carltongibson commented 1 year ago

it raises django.core.exceptions.SynchronousOnlyOperation

Yes, you'd need to wrap it in sync_to_async().

The idea is to reduce the number of different context so you can identify when the DB connection is being closed.

carltongibson commented 1 year ago

MyModel.objects.filter(id=self.object_id).exists()

This will be because of the transaction test case usage. From the other transaction the object isn't yet saved, I'd guess.