redis / redis-py

Redis Python client
MIT License
12.55k stars 2.51k forks source link

is it caused by the unsafe threads? #2865

Open JerryHuang-LE opened 1 year ago

JerryHuang-LE commented 1 year ago

Version:

Platform:

Description: redis.py

import redis.asyncio as aioredis

pool = aioredis.ConnectionPool(
     host=url.hostname,  # type: ignore
     port=url.port,  # type: ignore
     username=url.username,
     password=url.password,
     connection_class=aioredis.SSLConnection,
     ssl_cert_reqs=None,
)

def get_connection_async_redis():
    """get a new connection instance for redis (for async framework)"""
    try:
        return aioredis.Redis(
            connection_pool = pool,
            auto_close_connection_pool=False
        )
    except IOError:
        _LOG.exception("Error when connecting to redis...")

cahce.py:

async def clear_cache(pattern: str) -> None:
    """Clear cache whose key matches the pattern"""

    conn = get_connection_async_redis()
    for k in await conn.keys():
        if pattern in k:
            await conn.delete(k)
    await conn.close()

unit_test.py:

from unittest import IsolatedAsyncioTestCase, main, mock

class TestPublicHolidaysDataRetrieval(IsolatedAsyncioTestCase):

    async def asyncSetUp(self) -> None:
        await clear_cache("key")

    async def test_func_1(self) -> None:
        ...

    async def test_func_2(self) -> None:
        ...

if __name__ == "__main__":
    main()

When you run the unit test (please make sure you have multiple functions need to be tested), it will throw out the error:

...got Future <Future pending created at /usr/local/lib/python3.11/asyncio/base_events.py:427> attached to a different loop
...
File "/usr/local/lib/python3.11/asyncio/base_events.py", line 519, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
evgenybf commented 11 months ago

When you close a connection, it gets returned to the pool. IsolatedAsyncioTestCase creates a new event loop on every run, so the second test function call receives a connection created from a different event loop which has been already closed. My guess is that you have to create a new ConnectionPool together with the connection.

JerryHuang-LE commented 11 months ago

@evgenybf thanks mate to take a look. I am not sure if i am fully understand you so please correct me if i am wrong. Do you mean we should try to create the connection pool for each single test function? And thats able to make sure the connection_pool and event loops create and close together?

evgenybf commented 10 months ago

@JerryHuang-LE

Do you mean we should try to create the connection pool for each single test function?

Yes

And thats able to make sure the connection_pool and event loops create and close together?

It will make sure that connection opened in one event loop won't be reused in another one. Check out connection.close() implementation:

    async def aclose(self, close_connection_pool: Optional[bool] = None) -> None:
        """
        Closes Redis client connection

        :param close_connection_pool: decides whether to close the connection pool used
        by this Redis client, overriding Redis.auto_close_connection_pool. By default,
        let Redis.auto_close_connection_pool decide whether to close the connection
        pool.
        """
        conn = self.connection
        if conn:
            self.connection = None
            await self.connection_pool.release(conn)  #  <---- this is not an actual close. It releases the connection so it can be reused on the next get_connection() call
        if close_connection_pool or (
            close_connection_pool is None and self.auto_close_connection_pool
        ):
            await self.connection_pool.disconnect()

This get_connection_async_redis() implementation doesn't fail for me:

def get_connection_async_redis():
    """get a new connection instance for redis (for async framework)"""
    pool = aioredis.ConnectionPool(
        host='localhost',
        port=6379,
        db=0,
        #  connection_class=aioredis.SSLConnection,
        #  ssl_cert_reqs=None,
    )

    try:
        return aioredis.Redis(
            connection_pool = pool,
            auto_close_connection_pool=False
        )
    except IOError:
        logger.exception("Error when connecting to redis...")

Alternatively, you can close the pool in asyncTearDown() (at least, it works for your test).

Since I don't usually have more than one event loop running, I prefer having a global initialize() and finalize() functions which initialize and finalize global "async" resources such as asyncio.Lock, redis/motor clients etc. In http service - they are called during server start/shutdown; in scripts - by a decorator applied to the main function; in unit tests - from a mixin's asyncSetUp/asyncTearDown.

JerryHuang-LE commented 10 months ago

Really appreciate the details! @evgenybf you are the legend! But actually, i tried to close the pool but still get the same error. :-(

    async def asyncTearDown(self) -> None:
        await pool.disconnect()
evgenybf commented 10 months ago

@JerryHuang-LE

But actually, i tried to close the pool but still get the same error. :-(

Sorry, I tested it with redis-py 5.0.1 only.

With redis-py 4.5.3 calling pool.reset() helped, but I'm not sure if ConnectionPool is supposed to be used after the reset(), as there is no documentation on it. Out of how it's implemented, I think it is.

class ConnectionPool(...):
...
    def reset(self):
        self._lock = asyncio.Lock()
        self._created_connections = 0
        self._available_connections = []
        self._in_use_connections = set()

        # this must be the last operation in this method. while reset() is
        # called when holding _fork_lock, other threads in this process
        # can call _checkpid() which compares self.pid and os.getpid() without
        # holding any lock (for performance reasons). keeping this assignment
        # as the last operation ensures that those other threads will also
        # notice a pid difference and block waiting for the first thread to
        # release _fork_lock. when each of these threads eventually acquire
        # _fork_lock, they will notice that another thread already called
        # reset() and they will immediately release _fork_lock and continue on.
        self.pid = os.getpid()
class TestPublicHolidaysDataRetrieval(IsolatedAsyncioTestCase):

    async def asyncSetUp(self) -> None:
        await clear_cache("key")

    async def asyncTearDown(self) -> None:
        await pool.disconnect()
        pool.reset()

    async def test_func_1(self) -> None:
        ...

    async def test_func_2(self) -> None:
        ...

With redis-py==4.5.3

..
----------------------------------------------------------------------
Ran 2 tests in 0.021s

OK

With redis-py==5.0.1

venv/lib/python3.11/site-packages/redis/asyncio/client.py:247: DeprecationWarning: "auto_close_connection_pool" is deprecated since version 5.0.0. Please create a ConnectionPool explicitly and provide to the Redis() constructor instead.
  warnings.warn(
test_async_redis.py:31: DeprecationWarning: Call to deprecated close. (Use aclose() instead) -- Deprecated since version 5.0.0.
  await conn.close()
..
----------------------------------------------------------------------
Ran 2 tests in 0.022s

OK
JerryHuang-LE commented 10 months ago
  • redis-py: 4.5.4
  • redis: 6.0.2
  • python 3.11.3

Thanks a lot! @evgenybf But hmmm so wired. I just copied your codes and ran it. It still get the same errors in my end. Just wonder which version are you using for Python? My env is :

redis-py: 4.5.4
redis: 6.0.2
python 3.11.3
evgenybf commented 10 months ago

@JerryHuang-LE same as yours. Would you mind sharing the traceback? Since, .reset() clears the connection pool, it can't be the same error.

evgenybf commented 10 months ago

See the test. It's supposed to be run in Docker https://github.com/evgenybf/redis-async-event-loop-test-tmp

JerryHuang-LE commented 10 months ago

@evgenybf You are the legend. It is working for me currently. Thanks mate.