Closed stinovlas closed 9 months ago
@s3rius I'd appreciate your feedback on the problem and proposed solutions. I'm ready to work on this, when we agree on the details.
Hi and thanks for finding it out. I guess the easiest option is to use a blocking pool without any way to change it. We can just add timeout parameter which configures when the exception is raised. By default we can set it to 1 to simulate the non-blocking pool implementation.
Hi and thanks for finding it out. I guess the easiest option is to use a blocking pool without any way to change it. We can just add timeout parameter which configures when the exception is raised. By default we can set it to 1 to simulate the non-blocking pool implementation.
Sounds good! I'm not sure whether we have to add timeout
argument, since any unknown kwargs
ale passed to ConnectionPool
already. But if you want to be more explicit, we can do that as well.
I'll get to it and we can tune the details in PR.
PR is ready :slightly_smiling_face:.
Problem
Both
BaseRedisBroker
andRedisScheduleSource
have argumentmax_connection_pool_size
which is passed toConnectionPool
. However,ConnectionPool
implementation throwsredis.exceptions.ConnectionError
when maximum amount of connections is exceeded. This exception is not caught and bubbles all the way up, which kills the scheduler (and broker).Suggestions
I found out that
redis
providesredis.asyncio.BlockingConnectionPool
which waits for connection instead of throwing the exception. There's a configurable timeout (after which the exception is raised). Despite the name, the asyncio variant ofBlockingConnectionPool
does not actually block the whole program, context is correctly switched on async sleep.We could leverage this class to provide easier processing of max connections limit. Otherwise, user would need to override
taskiq-redis
classes and replaceConnectionPool
withBlockingConnectionPool
manually.I see following possibilities:
connection_pool_cls: Type[ConnectionPool]
forRedisScheduleSource
andBaseRedisBroker
. This would contain anyConnectionPool
subclass (includingBlockingConnectionPool
). This is the one I prefer.connection_pool: ConnectionPool
forRedisScheduleSource
andBaseRedisBroker
. This would contain an instance of anyConnectionPool
subclass (includingBlockingConnection
). The URL would have to be duplicated in this case (passed both to theConnectionPool
instance andRedisScheduleSource
itself (even if not used, in order to maintain compatible API).blocking: bool
forRedisScheduleSource
andBaseRedisBroker
. Based on the value, we'd internally decide whether to useConnectionPool
orBlockingConnectionPool
. This is the least flexible, because behaviour cannot be easily changed from outside (e.g. by subclassingConnectionPool
).In all cases, the change can be made backwards compatible (although I'd argue that current behaviour with uncaught exception doesn't make sense and
BlockingConnectionPool
is a good default). Alternatively, we could:BlockingConnectionPool
and throw awayConnectionPool
altoghether. This would minimize the changes (just replaceConnectionPool
withBlockingConnectionPool
), but it's a breaking change.Notes
~
redis.asyncio.RedisCluster
does not suffer the same problem, because it has it's own connection pool handling mechanism and already allows for retries.~ *EDIT: There is actually some problem with cluster reconnects. I created https://github.com/redis/redis-py/issues/3135 to resolve it.We should also consider some modification of
RedisAsyncResultBackend
andRedisAsyncClusterResultBackend
. These classes don't accept any argument to limit number of simultaneous connections.