redis / redis-py

Redis Python client
MIT License
12.4k stars 2.48k forks source link

Connection leak when async Sentinel client is used #3259

Closed evgenybf closed 1 month ago

evgenybf commented 1 month ago

Version: redis[hiredis] = 5.0.1 - 5.0.4

Platform: Python 3.9.19 / docker python:3.9-slim / bitnami/redis:4.0-ol-7, bitnami/redis-sentinel:4.0-ol-7

Description:

Asynchronous version of Sentinel creates unlimited number of connections to redis. When the following script is running, the number of open connections is growing up from 3 to 10000.

import asyncio

from redis.asyncio.sentinel import Sentinel

import settings

async def initialize_redis():
    global client, sentinel

    sentinel = Sentinel(
        settings.REDIS_SENTINEL_ADDRESSES,
        db=settings.REDIS_SENTINEL_DB,
        socket_timeout=2.0,
    )

    client = sentinel.master_for(settings.REDIS_SENTINEL_SERVICE, socket_timeout=2.0)

async def finalize_redis():
    global client, sentinel

    await client.aclose(close_connection_pool=True)
    client = None
    if sentinel:
        # Sentinel class creates ConnetionPools for every hostname in its constructor.
        # Then, sentinel.master_for() creates yet another SentinelConnectionPool which will be
        # closed by client.close(). The problem is that sentinel's connection pools will be
        # left open, which will result in an warning in __del__(). Close them here...
        for sentinel in sentinel.sentinels:
            await sentinel.aclose()
    sentinel = None

async def coro():
    await client.get('test')

async def test():
    await initialize_redis()
    try:
        for i in range(10000):
            print("Run it", i)
            await asyncio.gather(
                *[asyncio.ensure_future(coro()) for i in range(10000)], return_exceptions=True
            )
    finally:
        await finalize_redis()

if __name__ == '__main__':
    asyncio.run(test())
bash-4.2$ redis-cli 
127.0.0.1:6379> info clients
# Clients
connected_clients:5070
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

Update:

It seems that even without Sentinel, the number of connections grows up to the number of coroutines running in parallel: So, it's not a leak

import asyncio

import redis.asyncio as redis

import settings

client = None

async def coro():
    await client.get('test')

async def test():
    global client

    client = redis.Redis.from_url(settings.REDIS_ADDR)

    for i in range(10000):
        print("Run it", i)
        await asyncio.gather(
            *[asyncio.ensure_future(coro()) for i in range(1000)], return_exceptions=True
        )

    await client.aclose()

if __name__ == '__main__':
    asyncio.run(test())
127.0.0.1:6379> info clients
# Clients
connected_clients:1003
evgenybf commented 1 month ago

My apologies. Turned out that it was enough to specify max_connections, as it's None by default. E.g.

    sentinel = Sentinel(
        settings.REDIS_SENTINEL_ADDRESSES,
        db=settings.REDIS_SENTINEL_DB,
        socket_timeout=2.0,
        max_connections=100,
    )

    client = sentinel.master_for(settings.REDIS_SENTINEL_SERVICE, socket_timeout=2.0)
client = redis.Redis.from_url(settings.REDIS_ADDR, max_connections=100)

We still have weird connection leaks in prod, but that's not that simple. Closing the ticket

evgenybf commented 1 month ago

Same code with sync Redis client creates only 1 connection (non-async time consuming function stops the event loop which is expected). That may be the reason why we didn't have such issue before

import asyncio

from redis import Sentinel

import settings

async def initialize_redis():
    global client, sentinel

    sentinel = Sentinel(
        settings.REDIS_SENTINEL_ADDRESSES,
        db=settings.REDIS_SENTINEL_DB,
        socket_timeout=2.0,
        max_connections=100,
    )

    client = sentinel.master_for(settings.REDIS_SENTINEL_SERVICE, socket_timeout=2.0)

async def finalize_redis():
    global client, sentinel

    client.close(close_connection_pool=True)
    client = None
    if sentinel:
        for sentinel in sentinel.sentinels:
            sentinel.close()
    sentinel = None

async def coro():
    client.get('test')

async def test():
    await initialize_redis()
    try:
        for i in range(10000):
            print("Run it", i)
            await asyncio.gather(
                *[asyncio.ensure_future(coro()) for i in range(10000)], return_exceptions=True
            )
    finally:
        await finalize_redis()

if __name__ == '__main__':
    asyncio.run(test())

...
127.0.0.1:6379> info clients
# Clients
connected_clients:4