redis / redis-py

Redis Python client
MIT License
12.68k stars 2.53k forks source link

pubsub.get_message() sometimes doesn't receive messages #2871

Open marianaacunhaa opened 1 year ago

marianaacunhaa commented 1 year ago

Version: redis-py 4.5.1, redis-stack

Platform: Python 3.9.12 on Ubuntu 20.4

Description: To give some context, I have a notifier that publishes a user_info dict in redis every time there is an update in a user. I can see this working fine using redis-stack.

Then I have a websocket that receives a user connection and upon connecting it connects to redis and subscribes to that user's channel like so:

 async def subscribe_redis(self, channel: str):
        try:
            r = aioredis.from_url(settings.REDIS_HOST)
            self.redis = r

            pubsub = r.pubsub()
            await pubsub.subscribe(channel)

            asyncio.create_task(self.reader(pubsub))

        except Exception as ex:
            logging.info(f"Error while subscribing to Redis: {str(ex)}")

        return True

Upon subscribing to the channel, it waits for messages (in this case, a user_info). The reader function:

async def reader(self, pubsub: aioredis.client.PubSub):
        while True:
            try:                
                if not self.websocket:
                    break

                message = await pubsub.get_message(
                    ignore_subscribe_messages=True,
                    timeout=1.0
                )

                if message is not None:
                    user_info = eval(message["data"])
                    await self.send_user_info(user_info)

            except Exception as ex:
                logging.info(str(ex))
                break

        return True

The expected functioning is: I make a change in the user inside my database, the notifier publishes the new user_info in the user's redis pubsub channel, the get_message() receives the user_info.

However, something weird happens: sometimes it works, sometimes it doesn't. Within the same websocket connection, when it works, it always works, meaning the get_message() always gets every new message; and within the same websocket connection, when it doesn't work, it never works, meaning the get_message never receives a message from the channel, even though the new message is being successfully published in redis.

Another weird thing is that in the case when it doesn't work, the redis connection is there and pubsub as well, if I make a redis.ping() and a pubsub.ping() inside the while loop I get a response.

Note that I call the subscribe_redis() function like this:

self.task = asyncio.create_task(
            self.subscribe_redis(redis_channel)
        ) 

Is there something I am not seeing?

asparagusbeef commented 4 months ago

I am having the same issue. I am subscribed to a channel, Redis Insight does capture the message in the correct channel, but on get_message messages are sometimes missed. This only happens for fast messages when there are more than one channels listening asynchronously.

Harsh-Maheshwari commented 3 months ago

Any Solution ? I am also facing the same problem, even when I am not doing anything in async

asparagusbeef commented 3 months ago

Any Solution ? I am also facing the same problem, even when I am not doing anything in async

Because I also have control over the publishers, and my business need allows for it, I added a small throttle before each publish to space them out a bit and it has resolved it... But that's definitely a workaround, as pubsub should be able to handle millions of ops/sec and the PubSub should buffer the messages, but I couldn't get it to work otherwise.

Harsh-Maheshwari commented 3 months ago

So what you are saying is because the publisher is publishing very fast the subscriber is not getting any message ? So i need to put a sleep between the publishes

that is not very efficient as the whole reason I am using pubsub is speed

Harsh-Maheshwari commented 3 months ago

The Redis Stack UI is able to pick up these publishes without any time delay, I think there must be a solution too this without time delays

asparagusbeef commented 3 months ago

That's true. Redis Insights also picks up on publishes with no issue when they are very rapid. It's something in the redis-py implementation.