redis / redis-py

Redis Python client
MIT License
12.62k stars 2.52k forks source link

"No connection available" errors since 5.0.1 #2995

Open DPham-X opened 1 year ago

DPham-X commented 1 year ago

Version: 5.0.1

Platform: Python 3.11.5(3.11.5 (main, Sep 4 2023, 15:30:52) [GCC 10.2.1 20210110])

Description:

Since 5.0.1, we noticed a significant increase in No connection available. errors from redis. We are using cashews (6.3.0) + redis-py for request caching for our fastapi application. We don't experience this with 5.0.0.

Stack trace:

CancelledError: null
  File "redis/asyncio/connection.py", line 1170, in get_connection
    async with self._condition:
  File "asyncio/locks.py", line 15, in __aenter__
    await self.acquire()
  File "asyncio/locks.py", line 114, in acquire
    await fut
TimeoutError: null
  File "redis/asyncio/connection.py", line 1169, in get_connection
    async with async_timeout(self.timeout):
  File "asyncio/timeouts.py", line 111, in __aexit__
    raise TimeoutError from exc_val
ConnectionError: No connection available.
  File "cashews/backends/redis/client.py", line 26, in execute_command
    return await super().execute_command(command, *args, **kwargs)
  File "redis/asyncio/client.py", line 601, in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
  File "redis/asyncio/connection.py", line 1174, in get_connection
    raise ConnectionError("No connection available.") from err
paul-finary commented 1 year ago

Same here, I have cases where "No connection available." is raised but the pool has 4 _available_connections, and only 5 _in_use_connections out of 500 max_connections.

@kristjanvalur I saw you refactored the ConnectionPool / BlockingConnectionPool in 5.0.1, do you have any idea of what would trigger this ?

kristjanvalur commented 1 year ago

async Redis client and connection pools now need to be properly closed.
The bug is in cachews. While the Redis backend there does call await self._client.close(), that does not close the connection pool.

cachews needs to ensure that the Redis instance owns the pool. Or, keep a separate pool somewhere and close it.

async def init(self):
        self._client = self._client_class(connection_pool=self._pool_class.from_url(self._address, **self._kwargs))
        await self._client.initialize()
        self.__is_init = True

should become

async def init(self):
        pool = self._pool_class.from_url(self._address, **self._kwargs)
        self._client = self._client_class.from_pool(pool)
        await self._client.initialize()
        self.__is_init = True

This ensures that the created instance _takesownership of the pool and closes it when itself is closed. The current syntax assumes that the caller will take care of closing the connection pool.

Please consider errors like this to be the result of incorrect usage in the library actually using redis-py.

paul-finary commented 1 year ago

Hi,

Thanks for the quick reply !

In my case, I'm not dependent on cashews, but I'm wrapping redis.asyncio.Redis in my own class, like so:

class Wrapper:
    """
    Defines a super class of Redis to add new methods and delay the connection after the initialization.
    """

    def __init__(self):
        self.redis = None

    async def connect(self):
        self.redis = Redis.from_pool(
            BlockingConnectionPool.from_url(
                os.getenv("URL"),
                db=0,
                decode_responses=True,
                max_connections=500,
                timeout=5,
            )
        )

    async def disconnect(self):
        await self.redis.aclose(close_connection_pool=True)

    def __getattr__(self, name):
        return getattr(self.redis, name)

The thing I don't understand, is that a ConnectionError is till raised even though:

  1. There is _available_connections in the pool
  2. There is less _in_use_connections than max_connections, so the pool can create and add new connections
kristjanvalur commented 1 year ago

can you please attach a traceback?

paul-finary commented 1 year ago

Sure:

CancelledError: null
  File "redis/asyncio/connection.py", line 1170, in get_connection
    async with self._condition:
  File "asyncio/locks.py", line 15, in __aenter__
    await self.acquire()
  File "asyncio/locks.py", line 114, in acquire
    await fut
TimeoutError: null
  File "redis/asyncio/connection.py", line 1169, in get_connection
    async with async_timeout(self.timeout):
  File "__init__.py", line 141, in __aexit__
    self._do_exit(exc_type)
  File "__init__.py", line 228, in _do_exit
    raise asyncio.TimeoutError
ConnectionError: No connection available.
  File "app/models/mixins/notifiable_mixin.py", line 66, in _notify_update
    await wrapper.lpush(f"queue:{queue}", payload)
  File "ddtrace/contrib/redis/asyncio_patch.py", line 22, in traced_async_execute_command
    return await _run_redis_command_async(span=span, func=func, args=args, kwargs=kwargs)
  File "ddtrace/contrib/redis/asyncio_patch.py", line 40, in _run_redis_command_async
    result = await func(*args, **kwargs)
  File "redis/asyncio/client.py", line 601, in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
  File "redis/asyncio/connection.py", line 1174, in get_connection
    raise ConnectionError("No connection available.") from err
kristjanvalur commented 1 year ago

That is weird. it is timing out trying to acquire the condition variable. That should never happen Except, this probably occurs at startup and there is a serialization problem here where the condition variable is held too long. This problem does not have anything to do with the previous one. I'll see if I can make a simple patch.

iamdbychkov commented 1 year ago

Hello @kristjanvalur Excuse me for bothering you, but why we have to close connections if redis instance never goes out of scope? For example if we have a long-running application isn't it better to simply create a pool and then use it for all requests? Does it mean that all open IDLE connection will be there forever and we should clean it all up ourselves if we don't need it at the moment?

Is there any docs on intended use of a async version of redis?

Personally, now I have a troubles with "ERR max number of clients reached" by redis itself, it's a completely different problem but couldn't help myself commenting, as documentation kinda glosses over async version of a client and you have to look everywhere

kristjanvalur commented 1 year ago

Hi @paul-finary , Could you test with the new redis/asyncio/connection.py which is part of the above-mentioned PR?

kristjanvalur commented 1 year ago

Hello @kristjanvalur Excuse me for bothering you, but why we have to close connections if redis instance never goes out of scope? For example if we have a long-running application isn't it better to simply create a pool and then use it for all requests? Does it mean that all open IDLE connection will be there forever and we should clean it all up ourselves if we don't need it at the moment?

Hi there. You don't have to. But if you are having this problem now, when you weren't having it before, it meant that you were losing connection objects and not explicitly closing them. The old version of redis-py would try to clean them up inthe del handlers, but this is really, really bad practice. For one thing, it causes all async jobs to block.

If you are seing this problem now, it means probably that there is some incorrect use of a connection pool, or a connection is not being correctly returned to the pool. What is your use case?

For using a pool object and re-using it, this is the pattern you should be using:


pool = redis.asyncio.ConnectionPool(...)
# make sure we close the pool when we are don
async with contextlib.aclosing(pool):
    for whatever in jobs():
        # make sure we return the connection to the pool after use
        async with redis.asyncio.Redis(connection_pool=pool) as redis:
            redis.do stuff()
kristjanvalur commented 1 year ago

If you forget to return/close connections, then Python will print out a "resource warning" when the connection object gets garbage collected.

DPham-X commented 1 year ago

async Redis client and connection pools now need to be properly closed. The bug is in cachews. While the Redis backend there does call await self._client.close(), that does not close the connection pool.

cachews needs to ensure that the Redis instance owns the pool. Or, keep a separate pool somewhere and close it.

async def init(self):
        self._client = self._client_class(connection_pool=self._pool_class.from_url(self._address, **self._kwargs))
        await self._client.initialize()
        self.__is_init = True

should become

async def init(self):
        pool = self._pool_class.from_url(self._address, **self._kwargs)
        self._client = self._client_class.from_pool(pool)
        await self._client.initialize()
        self.__is_init = True

This ensures that the created instance _takesownership of the pool and closes it when itself is closed. The current syntax assumes that the caller will take care of closing the connection pool.

Please consider errors like this to be the result of incorrect usage in the library actually using redis-py.

Thanks, I'll follow up with cashews

paul-finary commented 1 year ago

@kristjanvalur Thanks for being so reactive ! I'm currently trying to reproduce the issue as I did not yet find a consistent trigger for the "No connection available" error. I'll keep you posted !

kristjanvalur commented 1 year ago

@kristjanvalur Thanks for being so reactive ! I'm currently trying to reproduce the issue as I did not yet find a consistent trigger for the "No connection available" error. I'll keep you posted !

My hypothesis is this: You create a pool, and immediately start a few hundred requests. However, the current implementation will make the socket connection within the condition variable lock. and so, all the tasks are waiting, on the timeout, while one after the other finishes their connections. A few hundred socket connects can easily take up to 5 seconds, and then finally one of them times out. Maybe you can repro using that, maybe your server was far away or something?

My PR should fix this, now the condition variable is only around pool management, not the socket connection code.

iamdbychkov commented 1 year ago

@kristjanvalur Thank you for your responsiveness! Feeling a little embarrassed and I'll understand if you keep my question unanswered as you're not obliged to and it's clearaly not a SO or some education course, and even this issue has nothing to do with my problem.

If you are seing this problem now, it means probably that there is some incorrect use of a connection pool, or a connection is not being correctly returned to the pool. What is your use case?

we're using redis-py 5.0.0 and problem came with the update of a perfomance requirements.

It's a web service which has to hit redis to process requests. Recently we got our requirements updated - now we have to handle 2k RPS on average.

I have a class which encapsulates redis handling:

class RedisCodeClient:

    def __init__(self, pool: ConnectionPool):
        self._conn_pool = pool
        self._active_db_number = 0

    async def select_active_db(self):
        async with Redis(connection_pool=self._conn_pool, db=0) as redis:
            active_db_number = await redis.get('active_db')
            if not active_db_number:
                return
            self._active_db_number = active_db_number

    async def get_code_data(self, code: str) -> list[dict[str, str]]:
        async with Redis(connection_pool=self._conn_pool, db=self._active_db_number) as redis:
            async with redis.pipeline(transaction=False) as pipe:
                for number in [code[:i] for i in range(len(code), 0, -1)]:
                    pipe.hgetall(number)
                return await pipe.execute()

This is a class, instance of which are created on the first request when we ask for it in our handlers:

@functools.lru_cache()
def get_code_client():
    settings = get_settings()
    pool = ConnectionPool(
        host=settings.CODE_REDIS_HOST,
        port=settings.CODE_REDIS_PORT,
        db=settings.CODE_REDIS_DB,
        decode_responses=True,
    )
    return RedisCodeClient(pool=pool)

 ...

 async def get_code_data(
        client: RedisCodeClient, code: str
) -> list[dict[str, str]]:
    await client.select_active_db()
    return await client.get_code_data(code)

 ...

 async def example(code: str):
    redis = get_code_client()
    code_data = await get_code_data(redis, code)

When we're testing our service at high RPS (~1.5k) - we hit the redis maxclients limit (in our case more than 5k connections) and redis become unresponsive untill we stop our application as connections are never closed (and we don't really want them to be closed as request rate is somewhat constant) The solution were to use a BlockingConnectionPool with max_connections param set, but now we're facing timeouts :( I understand the nature of a problem but to me it seems it can't be solved without tweaking hardware or rewriting everything to use a single pipelined connection and callback like interfaces.

EDIT:

If somebody stumbles upon my code and for any reason decide to use it for their own projects be careful there is nasty bug with selecting logical database in this code. parameter db passed to Redis class initializer won't do anything if you pass in Connection pool. This parameter have to be defined on the pool itself. So for selecting logical database - you need to use another approach, my approach was to use two separate connection pools and asyncio synchronizing primitives

kristjanvalur commented 1 year ago

The solution were to use a BlockingConnectionPool with max_connections param set, but now we're facing timeouts :( I understand the nature of a problem but to me it seems it can't be solved without tweaking hardware or rewriting everything to use a single pipelined connection and callback like interfaces.

Hi there. Superficially, your code looks okay. The only leakage here is that I see is the global ConnectionPool being created but that is okay, since there is only one instance.

The fact that changing to BlockingConnectionPool with max_params causes you to block, indicates that somehow, connections are not being returned to the pool...

I'll investigate this a bit with the code you supplied.

paul-finary commented 1 year ago

@kristjanvalur Based on my tests, it looks like your PR indeed fixed the issue with the ConnectionError raised ! Thanks again for taking care of this, this quickly.

kristjanvalur commented 1 year ago

@iamdbychkov I am unable to reproduce your issue with the code provided.
For 5.0.0, I can verify that all the connections are indeed returned to the pool. Is it possible that your web application / framework is using multiple threads, as well as Tasks?

Despite that, I think I can improve on the safety of the library. I'll prepare a PR which should improve things and produce relevant warnings when connections are lost.

iamdbychkov commented 1 year ago

@kristjanvalur My bad! I'm sorry, should've mentioned. Yes, every request is separate asyncio task. My guess is that a reason for everything not working is just a fact that incoming rate of requests is much greater than our redis instance is able to handle - new connections keep opening, as others are busy and we hit the redis max_clients limit. When we were developing apps in python2 we had our own implementation of redis async client which basically pipes all requests into a single connection and then calls callbacks. I guess it added to my confusion :(

I've tweaked redis to handle more than 5k connections and we've scheduled our tests on monday. Thank you for your responsiveness, you're being a real role model!

Krukov commented 1 year ago

@kristjanvalur

  1. Does redis-py follow Semantic Versioning ? if so 5.0.0 -> 5.0.1 shouldn't brake any third party libs or applications that use redis-py (PATCH version when you make backward compatible bug fixes, rather MINOR release should have a backward compatible changes).

Can you introduce a MAJOR release for the backward incompatible changes?

  1. auto_close_connection_pool is a public property. Is it? Is it allowed to change directly ?
kristjanvalur commented 1 year ago

Hello @Krukov . As I already explained before, I am not a maintainer of this repository, just a regular contributor.

  1. I'm not sure what you are referring to here. what is it that you think broke? If you are talking about register_conneciton_callback(), I have explained above that that function was not considered to be a public api by the maintainers and was not documented as such. That said, there is already a PR in this repo to restore those functions and make them official. Let's see what happens.
  2. I don't think we changed the functionality of this attribute. But we did add a deprecation message, suggesting that libraries don't use it. Do you think that it should not be allowed to deprecate functionality in patch versions?

Cheers!

kristjanvalur commented 1 year ago

@kristjanvalur My bad! I'm sorry, should've mentioned. Yes, every request is separate asyncio task. My guess is that a reason for everything not working is just a fact that incoming rate of requests is much greater than our redis instance is able to handle - new connections keep opening, as others are busy and we hit the redis max_clients limit. When we were developing apps in python2 we had our own implementation of redis async client which basically pipes all requests into a single connection and then calls callbacks. I guess it added to my confusion :(

I've tweaked redis to handle more than 5k connections and we've scheduled our tests on monday. Thank you for your responsiveness, you're being a real role model!

Hi. I was wondering, in addition to Tasks, maybe your web stack is also using threads? I'm just guessing here, it could be an avenue to explore, since your global ConnectionPool would then need to be thread safe.

iamdbychkov commented 1 year ago

Hi. I was wondering, in addition to Tasks, maybe your web stack is also using threads? I'm just guessing here, it could be an avenue to explore, since your global ConnectionPool would then need to be thread safe.

Yes, we do spawn threads for some jobs, but not for request handling. Threads we spawn do not interfere with asyncio/redis part. At least to my knowledge. We're using FastAPI as our web-framework. I'll look it up, thanks for the tip!

Krukov commented 1 year ago

@kristjanvalur Sorry, my bad, I thought that auto_close_connection_pool was changed from True to False but now I see that it's not (that was not obvious because it was True by default in constructor but switched to false soon). But in this case I can't find a glue between bug in cashews about owning a pool and "No connection available" error. More likely that is related to the problem that fixed by this MR

  1. You had added deprecation message if someone use it in constructors but what if some one change it directly
    client = Redis(...)
    client.auto_close_connection_pool = True 
kristjanvalur commented 1 year ago

@kristjanvalur Sorry, my bad, I thought that auto_close_connection_pool was changed from True to False but now I see that it's not (that was not obvious because it was True by default in constructor but switched to false soon).

No, it will not be switched to false. Currently, (and this has always been the case) this constructor argument is ignored if someone also supplies a "connection_pool" argument. The idea of this argument was to be able to steal the connection pool which Redis automatically creates, and use it for something else.

redis = Redis(auto_close_connection_pool=False)
pool = redis.connection_pool.
await redis.aclose()
#pool is still open and I can do what I want with it.  but I need to close it if I don't want my connections to leak.

This was the only reason for this argument, to deal with cases where someone wanted to use the ConnectionPool that was created automatically as part of the Redis instance, after closing the Redis.

In this case here:

pool = ConnectionPool()
redis = Redis(connection_pool=pool, auto_close_connection_pool=True)

the auto_close_connection_pool is ignored. Redis will not automatically close the pool for you. It assumes that since you supplied the pool, you are also responsible for closing it.

Confusing? I think so. This is why we are deprecating it.

But in this case I can't find a glue between bug in cashews about owning a pool and "No connection available" error. More likely that is related to the problem that fixed by this MR

The correct way, if you create your Pool yourself, is to do

pool = ConnectionPool()
redis = Redis(connection_pool = pool)
...
await redis.aclose()
await pool.close()

or use the new function, from_pool, which is like modifying the auto_close_connection_pool attribute after creating:

pool = ConnectionPool()
redis = Redis.from_pool(pool)
...
await redis.aclose()
# pool is now also closed

The reason you are seeing the "No connection available" now, is that garbage collection has changed. The latest version, the library will not automatically close connections that you forget about. This is because it is hard to do so reliably, there are lots of problems with doing that. And it is the responsibility of the user to make sure they keep track of and close their connections.

That said, I have a PR in this repo which restores some of the safety of the garbage collection, but will also output a ResourceWarning if you forget to close a connection,

  1. You had added deprecation message if someone use it in constructors but what if some one change it directly
client = Redis(...)
client.auto_close_connection_pool = True 

Well, this is actually what the from_pool() constructor does. The attribute is not a publicly documented attribute. We need it to be there, for the implementation. I guess we could deprecated setting it too, but it would be a bit cumbersome.

851459955 commented 1 year ago

I have a question, is it really necessary to set 'auto_close_connection_pool' to true in sentinel working mode? In redis-cluster and single redis connection, it is a single connection. Every time a request comes in, a new connection is created and the request is closed. It is normal, but in the sentinel working mode, redis does not directly create the connection pool. Instead, it creates the redis connection itself after getting all the connections from sentinel. When 'auto_close_connection_pool' is true, 'master_for' and 'slave_for' ' All the connections created will be closed, causing each request to reacquire the master and slave of the sentinel connection, and then create a redis connection. Normally, it should not be the case. Only a new redis connection is initialized during the request process. When this Refresh the sentinel connection only when the connection fails and reacquire the master-slave to update to the new master-slave redis connection. Also, the 'connection' created in redis-sentinel has no effect in the entire fastapi request. It is executed every time. A new connection is taken from the sentinel connection pool to execute the command, but the same one is not reused. This may be a problem with my writing method.

redis: Sentinel connections will be reused before 5.0. After 5.1, auto_close_connection_pool is true and cannot be reused.

from typing import Dict

from fastapi import FastAPI
from redis.asyncio import Sentinel, Redis

from config.settings import DATA_CONFIG

# redis
REDIS_SENTINEL_HOST0 = DATA_CONFIG.redis_sentinel_host0
REDIS_SENTINEL_HOST1 = DATA_CONFIG.redis_sentinel_host1
REDIS_SENTINEL_HOST2 = DATA_CONFIG.redis_sentinel_host2
REDIS_SENTINEL_PASSWORD = DATA_CONFIG.redis_sentinel_password
REDIS_PASSWORD = DATA_CONFIG.redis_password
REDIS_SENTINEL_PORT = DATA_CONFIG.redis_sentinel_port
REDIS_SENTINEL_SERVICE_NAME = DATA_CONFIG.redis_sentinel_service_name
REDIS_DB_0 = DATA_CONFIG.redis_db_0
REDIS_DB_1 = DATA_CONFIG.redis_db_1
REDIS_DB_2 = DATA_CONFIG.redis_db_2

redis_db_master_dict: Dict[int, Redis]
redis_db_slave_dict: Dict[int, Redis]

def init_db_session(apps: FastAPI) -> None:

    @apps.on_event('startup')
    async def _init_db() -> None:
        global redis_db_master_dict
        global redis_db_slave_dict

        redis_sentinel = Sentinel(sentinels=[(REDIS_SENTINEL_HOST0, REDIS_SENTINEL_PORT),
                                             (REDIS_SENTINEL_HOST1, REDIS_SENTINEL_PORT),
                                             (REDIS_SENTINEL_HOST2, REDIS_SENTINEL_PORT)],
                                  sentinel_kwargs={
                                      'password': REDIS_SENTINEL_PASSWORD,
                                      'decode_responses': True
                                  },
                                  password=REDIS_PASSWORD,
                                  decode_responses=True)

        redis_db_master_dict = {
            REDIS_DB_0:
            redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=REDIS_DB_0),
            REDIS_DB_1:
            redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=REDIS_DB_1),
            REDIS_DB_2:
            redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=REDIS_DB_2)
        }

        redis_db_slave_dict = {
            REDIS_DB_0:
            redis_sentinel.slave_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=REDIS_DB_0),
            REDIS_DB_1:
            redis_sentinel.slave_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=REDIS_DB_1),
            REDIS_DB_2:
            redis_sentinel.slave_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=REDIS_DB_2)
        }

async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_db_master_dict
    client = redis_db_master_dict[db]
    async with client as redis_write:
        return redis_write

async def redis_slave(db: int = REDIS_DB_0) -> Redis:
    global redis_db_slave_dict
    async with redis_db_slave_dict[db] as redis_read:
        return redis_read
from config.db import (REDIS_DB_1, REDIS_DB_2, redis_master, redis_slave, init_sqlalchemy_client)

async def redis_master_0():
    yield await redis_master()

async def redis_slave_0():
    yield await redis_slave()

RedisMaster0 = Annotated[Redis, Depends(redis_master_0)]
RedisSlave0 = Annotated[Redis, Depends(redis_slave_0)]

Don't quite understand how it works Before calling the request async def aenter(self: _RedisT) async def aexit(self, exc_type, exc_value, traceback) The execution is completed, resulting in ‘self.connection‘ never being created, and it is always the new connection.

@kristjanvalur

kristjanvalur commented 1 year ago

Every time you call slave_for() a new Redis() instance is created, with a new ConnectionPool.. I see in your code that you do this globally. Therefore, these objects will not get deleted, they will live forever. The change in 5.0.1 was to make sure that when you close the client, you also close this automatically created connection pool, otherwise, there would be no way to close it.

Your problem is a different one. You don't want the "async with". You are keeping the Redis objects that you got from master_for() and slave_for(), in a global dict, and not deleting / closing them. the async with is only about automatically calling Redis.aclose() for you. This will close the connection pool. Your solution, is to not close them, not use the async for.

Instead, just do the following:

async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_db_master_dict
    return redis_db_master_dict[db]

the async with pattern is for ensuring that you close all connections after you are done with them:

async with sentinel.slave_for(...) as client:
    await client.do_something()
    await client.do_something_different()
# now , 'client' is closed, and all its connections too.

Cheers!

851459955 commented 1 year ago

Every time you call a new Redis() instance is created, with a new ConnectionPool.. I see in your code that you do this globally. Therefore, these objects will not get deleted, they will live forever. The change in 5.0.1 was to make sure that when you close the client, you also close this automatically created connection pool, otherwise, there would be no way to close it.slave_for()

Your problem is a different one. You don't want the "async with". You are keeping the objects that you got from and , in a global dict, and not deleting / closing them. the is only about automatically calling for you. This will close the connection pool. Your solution, is to not close them, not use the .Redis``master_for()``slave_for()``async with``Redis.aclose()``async for

Instead, just do the following:

async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_db_master_dict
    return redis_db_master_dict[db]

the pattern is for ensuring that you close all connections after you are done with them:async with

async with sentinel.slave_for(...) as client:
    await client.do_something()
    await client.do_something_different()
# now , 'client' is closed, and all its connections too.

Cheers!

I‘ll give it a try. thank you

851459955 commented 1 year ago
 async def initialize(self: _RedisT) -> _RedisT:
        if self.single_connection_client:
            async with self._single_conn_lock:
                if self.connection is None:
                    self.connection = await self.connection_pool.get_connection("_")
        return self

I found that in the connection of sentinel, initializing async def __aenter__ with async with did not enter the step of creating a connection. single_connection_client directly returned self for false

Wouldn’t such transformations async def __aenter__ and async def __aexit__ be meaningless? This was also a question I had before

Or should I execute it after master_for? After I get my dict and return it, I can use async with redis_master.client() to directly initialize a redis connection.

I tried it and I found that when the redis-sentinel program fails over My program will report an error on each request and refresh the redis connection. Does this mean that the entire connection created by sentinel still needs to be closed in order to refresh the master-slave connection before each request comes in, ensuring that it is the latest every time? redis master-slave connection

# Error reporting location  `redis.asyncio.client` 614 row beginning
try:
    return await conn.retry.call_with_retry(
        lambda: self._send_command_parse_response(
            conn, command_name, *args, **options
        ),
        lambda error: self._disconnect_raise(conn, error),
    )
finally:
    if self.single_connection_client:
        self._single_conn_lock.release()
    if not self.connection:
        await pool.release(conn)

#  error info
redis.exceptions.ConnectionError: Connection closed by server.

Do I need to change the program like this?

async def redis_master_1():
   async with redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=REDIS_DB_0) as redis_client:
      async with redis_client.client() as redis_write:
          yield redis_write

Last attempt result

async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_sentinel
    return redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=db).client()

async def redis_slave(db: int = REDIS_DB_0) -> Redis:
    global redis_sentinel
    return redis_sentinel.slave_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=db).client()
from config.db import (REDIS_DB_1, REDIS_DB_2, redis_master, redis_slave)

async def redis_master_0():
    async with await redis_master() as redis_write:
        yield redis_write

async def redis_slave_0():
    async with await redis_slave() as redis_read:
        yield redis_read

RedisMaster0 = Annotated[Redis, Depends(redis_master_0)]
RedisSlave0 = Annotated[Redis, Depends(redis_slave_0)]

This works normally, but auto_close_connection_pool will not be closed.

@kristjanvalur

kristjanvalur commented 1 year ago

Redis can work in "single_connection_client" mode, but normally it doesn't. In Sentinel mode it does not. and __aenter__() does nothing. Don't worry about single connection mode.

(Actually, a single connection client is created when you call redis.client(), ... I'll get to that later.)

In normal mode, this is what happens:

  1. redis=Redis() is created. An internal ConnectionPool is created.
  2. you do await redis.execute_command indirecly via e.g. await redis.get(). This is what happens a. a connection is got from the pool, maybe created b. the command is executed c. the connection is put back to the pool.
  3. repeat step 2 above, for many Tasks, in parallel, for a long time.
  4. call await redis.aclose(). This closes all connections in the pool.
redis = Sentinel.master_from()
async with redis:
    for request in requests:
       await redis.get(key)   # just get and return a connection automatically
# now all the pool is closed.

The purpose of async with is chiefly to remember to call redis.close(), which again is to call redis.connection_pool.close(). You only call redis.aclose() if you do not want to use this redis instance again.

Redis.client()

But it is also possible to use the client() method, to get a private client connection if you want to do many things with it on the same Task:

  1. redis=Redis() is created. An internal ConnectionPool is created.
  2. cli = redis.client(). This creates a single connection client, borrowing the same pool as redis
  3. async with client, will get the connection, allowing you to use it, and _return it to the pol after`
redis = Sentinel.master_from()
async with redis:
    for request in requests:
       async with redis.client() as cli:   # get a private single-connection client. makes sense if you want to do many operations.
           await cli.get(key)
       # now connection goes back to the pool
# now all the pool is closed.

Your case:

Now, as I understand it: 1) you have a global Sentinel. 2) You want a global master_for(), that can re-use the connections for different "requests"

The answer is simple: do not call redis.aclose(). Keep it open. Do not use async with on the master Redis() object.

You can use redis.client() as in your code. That will create a new single_connection_client, using the same pool as the parent.

solution 1. No client()

You just use the same master Redis() object for each connection

# cache the returned redis object.  only create _one_ for each db
@functools.cache
async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_sentinel
    # create a new Redis object, connected to this service, as master
    # it has its own, _private_, ConnectionPool, which is bound to the unique address of this
    # particular server.
    return redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=db)

# a fastAPI dependency
async def redis_master_0():
    # just a short cut to get the master for 0
    return redis_master(0)

#fastapi endpoint handler
@fastapi.get # or whatever
async def do_something(redis_master_0):
    # each request is sharing the same master object
    value = await redis_master_0.get("key")

solution 2: with redis.client()

Here you to create a private Redis object for each request

# cache the returned redis object.  only create _one_ for each db
@functools.cache
async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_sentinel
    # create a new Redis object, connected to this service, as master
    # it has its own, _private_, ConnectionPool, which is bound to the unique address of this
    # particular server.
    return redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=db)

# a fastAPI dependency
async def redis_master_0():
    # get a single connection client to use for a request. It shares the pool with the parent. closing it will not close the pool.
    async with redis_master(0).client() as single_connection_client:
        yield single_connection_client
    # here, the single_connection_client is closed at the end of the request.  Closing it just meand
    # returning it to the pool.

#fastapi endpoint handler
@fastapi.get # or whatever
async def do_something(redis_master_0):
    # each request gets a new private Redis, and must close it.  the fastapi dependency will take care of that.
    value = await redis_master_0.get("key")
851459955 commented 1 year ago

Redis can work in "single_connection_client" mode, but normally it doesn't. In Sentinel mode it does not. and __aenter__() does nothing. Don't worry about single connection mode.Redis可以在“single_connection_client”模式下工作,但通常不会。 在Sentinel模式下,它不会。 #20002;什么都不做。 不要担心单一连接模式。

(Actually, a single connection client is created when you call redis.client(), ... I'll get to that later.)(实际上,当您调用redis.client()时,会创建一个连接客户端,... 我以后再谈这个问题。)

In normal mode, this is what happens:在正常模式下,会发生以下情况:

  1. redis=Redis() is created. An internal ConnectionPool is created.redis=Redis()创建 创建内部ConnectionPool
  2. you do await redis.execute_command indirecly via e.g. await redis.get(). This is what happens a. a connection is got from the pool, maybe created b. the command is executed c. the connection is put back to the pool.你做await redis.execute_command间接地通过例如。await redis.get(). 这就是结局 a.从池中获取连接, B.执行命令 C.连接被放回到池。
  3. repeat step 2 above, for many Tasks, in parallel, for a long time.重复上述步骤2,对于多个任务,并行,长时间。
  4. call await redis.aclose(). This closes all connections in the pool.呼叫await redis.aclose()。 这将关闭池中的所有连接。
redis = Sentinel.master_from()
async with redis:
    for request in requests:
       await redis.get(key)   # just get and return a connection automatically
# now all the pool is closed.

The purpose of async with is chiefly to remember to call redis.close(), which again is to call redis.connection_pool.close(). You only call redis.aclose() if you do not want to use this redis instance again.async with的目的主要是记住调用redis.close(),这又是调用redis.connection_pool.close()。 如果你不想再使用这个redis实例,你只能调用redis.aclose()

Redis.client()

But it is also possible to use the client() method, to get a private client connection if you want to do many things with it on the same Task:但是如果你想用它做很多事情,也可以使用client()方法来获得一个私有客户端连接 同样的Task

  1. redis=Redis() is created. An internal ConnectionPool is created.redis=Redis()创建 创建内部ConnectionPool
  2. cli = redis.client(). This creates a single connection client, borrowing the same pool as redis``cli = redis.client(). 这将创建一个连接客户端,借用与redis相同的池
  3. async with client, will get the connection, allowing you to use it, and _return it to the pol after`async with client,将获取连接,允许您使用它,并_在`之后将其返回到poll
redis = Sentinel.master_from()
async with redis:
    for request in requests:
       async with redis.client() as cli:   # get a private single-connection client. makes sense if you want to do many operations.
           await cli.get(key)
       # now connection goes back to the pool
# now all the pool is closed.

Now, as I understand it:据我所知

  1. you have a global Sentinel.你有一个全球哨兵。
  2. You want a global master_for(), that can re-use the connections for different "requests"您需要一个全局master_for(),它可以为不同的“请求”重用连接

The answer is simple: do not call redis.aclose(). Keep it open. Do not use async with on the master Redis() object.答案很简单:不要打电话#1。 保持开放。 不要在主Redis()对象上使用redis.aclose()

You can use redis.client() as in your code. That will create a new single_connection_client, using the same pool as the parent.你可以在代码中使用redis.client()。 这将创建一个新的single_connection_client,使用与父节点相同的池。

client()

You just use the same master Redis() object for each connection你只需要为每个连接使用相同的主Redis()对象

# cache the returned redis object.  only create _one_ for each db
@functools.cache
async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_sentinel
    # create a new Redis object, connected to this service, as master
    # it has its own, _private_, ConnectionPool, which is bound to the unique address of this
    # particular server.
    return redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=db)

# a fastAPI dependency
async def redis_master_0():
    # just a short cut to get the master for 0
    return redis_master(0)

#fastapi endpoint handler
@fastapi.get # or whatever
async def do_something(redis_master_0):
    # each request is sharing the same master object
    value = await redis_master_0.get("key")

redis.client()

Here you to create a private Redis object for each request在这里,您可以为每个请求创建一个私有Redis对象

# cache the returned redis object.  only create _one_ for each db
@functools.cache
async def redis_master(db: int = REDIS_DB_0) -> Redis:
    global redis_sentinel
    # create a new Redis object, connected to this service, as master
    # it has its own, _private_, ConnectionPool, which is bound to the unique address of this
    # particular server.
    return redis_sentinel.master_for(service_name=REDIS_SENTINEL_SERVICE_NAME, db=db)

# a fastAPI dependency
async def redis_master_0():
    # get a single connection client to use for a request. It shares the pool with the parent. closing it will not close the pool.
    async with redis_master(0).client() as single_connection_client:
        yield single_connection_client
    # here, the single_connection_client is closed at the end of the request.  Closing it just meand
    # returning it to the pool.

#fastapi endpoint handler
@fastapi.get # or whatever
async def do_something(redis_master_0):
    # each request gets a new private Redis, and must close it.  the fastapi dependency will take care of that.
    value = await redis_master_0.get("key")

Thank you for your solution. The second solution is a method that I feel is more understandable, and it indeed reduces the creation process. Because I still think that the connection recycling operation is necessary, because I cannot guarantee that my program will not generate errors during operation.

Thank you very much for your support