Tronic / redio

Redis client for Python Trio
22 stars 5 forks source link

Retrieving from pubsub blocks? #4

Closed zgoda closed 2 years ago

zgoda commented 3 years ago

I have 2 tasks started by nursery that process communication from different channels, one of them is PubSub reader. The code fetches messages one by one in tight loop:

while True:
    # await trio.sleep(0)
    if ws.closed:
        registry.remove(client_id=client)
        break
    user = registry.get(client_id=client)
    if user:
        message = await user.collect_message()
        payload = message.message
        payload['topic'] = message.topic
        try:
            await ws.send_message(json.dumps(payload))
        except ConnectionClosed:
            registry.remove(client_id=client)
            break

Fetch code:

    async def collect_message(self) -> Message:
        topic, message = await self._pubsub
        return Message(topic, message)

Once reader task starts executing it blocks the whole execution until 1st message arrives. I was able to overcome this by adding await trio.sleep(0) at the beginning of loop but I am not sure if it's a bug or I'm doing something stupid here. Tried to implement collect_message as async generator but the result was the same.

Any hints?

Tronic commented 3 years ago

Can you reproduce this hang without using websockets? I recall that there was something funny with how the websockets module (and as wrapped in many web frameworks) communicates over network, such that awaiting an extra zero-sleep would cure hanging problems. I'll have a closer look if you still think this is related to redio.

Tronic commented 2 years ago

@zgoda Is this issue still relevant?

zgoda commented 2 years ago

I don't think so, the code works OK with that trio.sleep(0) workaround.