pytest-dev / pytest-asyncio

Asyncio support for pytest
https://pytest-asyncio.readthedocs.io
Apache License 2.0
1.42k stars 149 forks source link

Warn about event loop changes using Redis/FakeRedis #622

Open M1ha-Shvn opened 1 year ago

M1ha-Shvn commented 1 year ago

Hi. I'm using fakeredis to run tests via pytest-asyncio in FastAPI application.

Library versions:

fakeredis==2.19.0
fastapi==0.103.1
hiredis==2.2.3
lupa==2.0
pytest-asyncio==0.21.1
redis==5.0.0

Everything works well in python 3.9 and earlier. But in 3.10 and 3.11 I start getting exceptions:

test setup failed
event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
request = <SubRequest 'storage_clean_up' for <Function test_read_from_cache__cache_save_failure>>
kwargs = {}
setup = <function _wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup at 0x7feb442d5900>

    @functools.wraps(fixture)
    def _async_fixture_wrapper(
        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    ):
        func = _perhaps_rebind_fixture_func(
            fixture, request.instance, fixturedef.unittest
        )

        async def setup():
            res = await func(**_add_kwargs(func, kwargs, event_loop, request))
            return res

>       return event_loop.run_until_complete(setup())

../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:326: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323: in setup
    res = await func(**_add_kwargs(func, kwargs, event_loop, request))
/opt/project/core/storages/tests/fixtures.py:23: in storage_clean_up
    await cls.storage.truncate()
/opt/project/core/storages/redis.py:285: in truncate
    await client.flushall()
../pip/lib/python3.10/site-packages/redis/asyncio/client.py:545: in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:1111: in get_connection
    if await connection.can_read_destructive():
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:472: in can_read_destructive
    return await self._parser.can_read_destructive()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:179: in can_read_destructive
    return await self.read_from_socket()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
../pip/lib/python3.10/site-packages/fakeredis/aioredis.py:83: in read
    return await self._socket.responses.get()  # type:ignore
/usr/local/lib/python3.10/asyncio/queues.py:156: in get
    getter = self._get_loop().create_future()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7feb429a9c60 maxsize=0 tasks=4>

    def _get_loop(self):
        loop = events._get_running_loop()

        if self._loop is None:
            with _global_lock:
                if self._loop is None:
                    self._loop = loop
        if loop is not self._loop:
>           raise RuntimeError(f'{self!r} is bound to a different event loop')
E           RuntimeError: <Queue at 0x7feb429a9c60 maxsize=0 tasks=4> is bound to a different event loop

/usr/local/lib/python3.10/asyncio/mixins.py:30: RuntimeError

If I disable FakeRedis and use redis-py in tests without isolation, I start getting this error:

self = Connection<host=redis,port=6379,db=0>, disable_decoding = False
timeout = None

    async def read_response(
        self,
        disable_decoding: bool = False,
        timeout: Optional[float] = None,
        *,
        disconnect_on_error: bool = True,
        push_request: Optional[bool] = False,
    ):
        """Read the response from a previously sent command"""
        read_timeout = timeout if timeout is not None else self.socket_timeout
        host_error = self._host_error()
        try:
            if (
                read_timeout is not None
                and self.protocol in ["3", 3]
                and not HIREDIS_AVAILABLE
            ):
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding, push_request=push_request
                    )
            elif read_timeout is not None:
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding
                    )
            elif self.protocol in ["3", 3] and not HIREDIS_AVAILABLE:
                response = await self._parser.read_response(
                    disable_decoding=disable_decoding, push_request=push_request
                )
            else:
>               response = await self._parser.read_response(
                    disable_decoding=disable_decoding
                )

/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:509: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:203: in read_response
    await self.read_from_socket()
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
/usr/local/lib/python3.10/asyncio/streams.py:669: in read
    await self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <StreamReader transport=<_SelectorSocketTransport closing fd=11>>
func_name = 'read'

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.

        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')

        assert not self._eof, '_wait_for_data after EOF'

        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()

        self._waiter = self._loop.create_future()
        try:
>           await self._waiter
E           RuntimeError: Task <Task pending name='Task-3' coro=<_wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup() running at /app/pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323> cb=[_run_until_complete_cb() at /usr/local/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

/usr/local/lib/python3.10/asyncio/streams.py:501: RuntimeError

During handling of the above exception, another exception occurred:

cls = <core.storages.tests.test_storages.TestRedisStorage object at 0x7f8c542801c0>

    @classmethod
    @pytest_asyncio.fixture(autouse=True)
    async def storage_clean_up(cls):
>       await cls.storage.truncate()

test_storages.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../redis.py:282: in truncate
    await client.flushall()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:550: in execute_command
    return await conn.retry.call_with_retry(
/app/pip/lib/python3.10/site-packages/redis/asyncio/retry.py:59: in call_with_retry
    return await do()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:524: in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:571: in parse_response
    response = await connection.read_response()
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:529: in read_response
    await self.disconnect(nowait=True)
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:385: in disconnect
    self._writer.close()  # type: ignore[union-attr]
/usr/local/lib/python3.10/asyncio/streams.py:337: in close
    return self._transport.close()
/usr/local/lib/python3.10/asyncio/selector_events.py:706: in close
    self._loop.call_soon(self._call_connection_lost, None)
/usr/local/lib/python3.10/asyncio/base_events.py:753: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/local/lib/python3.10/asyncio/base_events.py:515: RuntimeError

Firstly, I thought, it was a problem in Fakeredis and created an issue there. But now I suppose, that it is something connected with pytest-asyncio as:

  1. Tests fail in base redis library too
  2. Redis-py does not fail without tests, my project works well while testing the server manually without tests

Can you suggest anything and where to dig more? Thanks

seifertm commented 1 year ago

Both stack traces complain about queues or tasks being attached to different event loops. Two things come to my mind:

  1. The event_loop fixture closes any existing loop and returns a new one. If you have tests that need to reuse the same loop, you need to ensure the correct scope of the event_loop fixture. You can print fixture scopes by running pytest with the option --setup-show.
  2. Async fixtures a pre-processed during collection phase. They run in their own loop. Trying to reuse a queue from a fixture in a test will trigger the error about the queue being attached to a different loop.

As to why this occurs only from Python 3.10 onwards, I don't know.

M1ha-Shvn commented 1 year ago

Thanks for comment. I'll have a look

seifertm commented 1 year ago

@M1ha-Shvn Any updates on your issue?

M1ha-Shvn commented 1 year ago

I guess, my problem is a result of architecture: Redis storage connection object realises Singleton pattern. It creates single object and reuses Redis instance. For testing purposes it replaces Redis with FakeRedis. When tests are run each test is run in separate loop, but singleton object is still the same (as python thread is the same) which leads to loop changing problems when calling Redis commands.

I've fixed the problem for myself by 2 things:

  1. A hacky solution regenerating FakeRedis object on loop change:

      current_loop = asyncio.get_event_loop()
      if self._loop is not None and self._loop is not current_loop:
          self._redis_clients.clear()
    
          # Script should be created in same loop, where it is executed
          self._generate_worker_id_script = None
    
          log('asyncio_loop_changed', 'asyncio_loop_changed', level=logging.WARNING)
    
      self._loop = current_loop
  2. Adding a global fixture with common event_loop to conftest.py. I'm not exactly sure why, but without this some clean up fixtures inside tests didn't work properly and left my FakeRedis database in unclear state
    @pytest.fixture(scope="session", autouse=True)
    def event_loop() -> Generator["AbstractEventLoop", Any, None]:
    policy = get_event_loop_policy()
    loop = policy.new_event_loop()
    yield loop
    loop.close()
seifertm commented 1 year ago

Maybe there's a smarter way for pytest-asyncio to detect loop changes. I'll leave the issue open for the time being.

vprud commented 2 months ago

Faced the same problem when running tests locally

vprud commented 2 months ago

It is interesting that the error is reproduced only when run tests module like pytest path/to/module/with/tests. If run purest for all tests like pytest ., there will be no error