encode / broadcaster

Broadcast channels for async web apps. 📢
BSD 3-Clause "New" or "Revised" License
1.13k stars 121 forks source link

Subscriber hangs after a Redis sever disconnection #144

Open JoseKilo opened 1 month ago

JoseKilo commented 1 month ago

When I subscribe to events using the Redis backend, any errors raised by the Redis library seem to be swallowed by the subscriber side of broadcaster.

Steps to reproduce

Launch a Redis server (for example using Docker):

docker run -d --name broadcaster_redis_test -p 6379:6379 redis:7.2-alpine

Launch a message consumer:

import asyncio

import broadcaster

async def main() -> None:
    async with broadcaster.Broadcast("redis://:@localhost:6379") as broadcast:
        print("Subscribing")
        async with broadcast.subscribe(channel="channel") as subscriber:
            print("Iterating")
            async for event in subscriber:  # type: ignore[union-attr]
                print("Received:", event)

if __name__ == "__main__":
    asyncio.run(main())

Launch a message publisher:

import asyncio
import itertools

import broadcaster

async def main() -> None:
    async with broadcaster.Broadcast("redis://:@localhost:6379") as broadcast:
        for i in itertools.count():
            print(f"Publishing {i}")
            await broadcast.publish(channel="channel", message=f"message {i}")
            await asyncio.sleep(1)
            print(f"Published {i}")

if __name__ == "__main__":
    asyncio.run(main())

Stop the Redis server:

docker stop broadcaster_redis_test

Observed behaviour

Publisher stops with a ConnectionError:

redis.exceptions.ConnectionError: Error Multiple exceptions: [Errno 111] Connect call failed ('::1', 6379, 0, 0), [Errno 111] Connect call failed ('127.0.0.1', 6379) connecting to localhost:6379.

Consumer just hangs as if everything is ok, except no new messages are received. No exceptions reach the caller code on the consumer side.

If the server comes back online:

docker start broadcaster_redis_test

The subscriber code stays in the same state regardless.

Even if we restart the publisher code, the subscriber side never does anything else, unable to receive new messages but still not raising any exceptions.

Expected behaviour

I'd expect both sides of the communication to receive an exception when a connection error happens. Or any other error raised by the Redis library not explicitly handled by the backend. That way if the library can't handle the network issue (trying to reconnect, etc) at least the caller code could try to do that, or handle it in some way depending on business logic.

Comments

I think the issue is caused by two similar failing points:

https://github.com/encode/broadcaster/blob/69cf29a41066f53a45498f0dfa36288befd73dd7/broadcaster/_base.py#L71 https://github.com/encode/broadcaster/blob/69cf29a41066f53a45498f0dfa36288befd73dd7/broadcaster/_backends/redis.py#L21

The tasks created with asyncio.create_task(...) are never observed in case they stop or fail. Using add_done_callback in both places I was able to pass the exception from the backend to the Broadcaster class, and then re-raise it from there.

I've got half a fix ready, but it's tricky to test it. If everything above makes sense I can try to get it ready for a PR.

Versions used

> pip freeze | grep -i broadcaster
broadcaster==0.3.1
> python --version
Python 3.12.3