aio-libs-abandoned / aioredis-py

asyncio (PEP 3156) Redis support
https://aioredis.readthedocs.io/
MIT License
2.3k stars 334 forks source link

Pubsub does not auto-reconnect with get_message() #1130

Open luhn opened 3 years ago

luhn commented 3 years ago

Counterpart of https://github.com/andymccurdy/redis-py/issues/1572

Run the following script:

import itertools
import traceback
import asyncio

import aioredis

async def consumer(pubsub):
    await pubsub.subscribe('test')
    while True:
        try:
            message = await pubsub.get_message(
                ignore_subscribe_messages=True,
                timeout=1.0,
            )
            if message:
                print(f'Receive: {message}')
        except asyncio.CancelledError:
            raise
        except Exception:
            traceback.print_exc()
            await asyncio.sleep(1.0)

async def producer(redis):
    for i in itertools.count():
        try:
            print(f'Publish {i}')
            await redis.publish('test', str(i))
        except Exception:
            traceback.print_exc()
        await asyncio.sleep(1.0)

loop = asyncio.get_event_loop()
redis = aioredis.from_url('redis://', decode_responses=True)
loop.create_task(consumer(redis.pubsub()))
loop.run_until_complete(producer(redis))

While running, stop the Redis server and then start it again.

Expected behavior: publish() and get_message() will fail while server is stopped, then succeed again after server restarts.

Observed behavior: publish() resumes working, but get_message() continues to fail with the following traceback:

Traceback (most recent call last):
  File "/Users/luhn/Code/aioredis-py/test.py", line 12, in consumer
    message = await pubsub.get_message(
  File "/Users/luhn/Code/aioredis-py/aioredis/client.py", line 4154, in get_message
    response = await self.parse_response(block=False, timeout=timeout)
  File "/Users/luhn/Code/aioredis-py/aioredis/client.py", line 4034, in parse_response
    if not block and not await conn.can_read(timeout=timeout):
  File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 850, in can_read
    return await self._parser.can_read(timeout)
  File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 453, in can_read
    return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
  File "/Users/luhn/Code/aioredis-py/aioredis/connection.py", line 464, in read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
aioredis.exceptions.ConnectionError: Connection closed by server.
Andrew-Chen-Wang commented 3 years ago

This is great code, but I believe we had a discussion before about how this was not the job of redis-py or aioredis. In my personal belief, I agree with this since people may want to be able to have a custom implementation for server failing.

luhn commented 3 years ago

Currently when a connection breaks, get_message() behaves differently than Redis.publish(), PubSub.subscribe(), or whatever your favorite command might be. That's why I'm reporting a bug. It should work exactly the same as everything else.

There's even a whole connect callback that will resubscribe to all subscribe topics upon reconnection, so obviously the implementation is meant to reconnect.

Andrew-Chen-Wang commented 3 years ago

I don't see where publish has a reconnection option. subscribe needs reconnection since it's long living, whereas publish (which I didn't see have a reconnect option) and get_message are single execution commands.

luhn commented 3 years ago

Maybe "reconnect" is the wrong word. All get_message needs to do is conn.disconnect() when a ConnectionError occurs. Connection will handle the rest.

I'm not sure what publish() does behind the scenes, but the effect is the same—The next call to publish() will use a new connection.

luhn commented 3 years ago

FWIW, the equivalent issue+PR I opened with redis-py has been merged. https://github.com/andymccurdy/redis-py/issues/1572

Andrew-Chen-Wang commented 3 years ago

@luhn apologies, I never had a chance to respond, especially after I made sense of "Maybe "reconnect" is the wrong word". Thanks for also linking to redis-py. I'll probably be skipping your PR since I'll be making a huge PR port from redis-py tomorrow.

Thanks for creating the issue though!