python-websockets / websockets

Library for building WebSocket servers and clients in Python
https://websockets.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
5.16k stars 513 forks source link

handling multiple websocket client connections #1460

Closed gborrageiro closed 4 months ago

gborrageiro commented 4 months ago

What would be your suggestion from handling multiple websocket client connections simultaneously? For example, say I have two websocket servers that I need to consume simultaneously. The recipe on https://websockets.readthedocs.io/en/stable/howto/patterns.html shows a message iterator:

async def consumer_handler(websocket):
    async for message in websocket:
        await consumer(message)

Furthermore, the message iterator would typically be wrapped in a context manager:

async for websocket in websockets.connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue

I suppose I could nest the two context managers, but would probably have to avoid message iteration, right? If you could point me in the direction of some reference code, that would be greatly appreciated. Finally, if I can avoid using multiple threads, I will.

Thanks Gabriel

aaugustin commented 4 months ago

You need this: https://websockets.readthedocs.io/en/stable/faq/asyncio.html#how-do-i-run-two-coroutines-in-parallel

aaugustin commented 4 months ago

You have to run multiple coroutines (not threads!)

gborrageiro commented 4 months ago

I get that I can gather tasks or use a task group. Your patterns page shows a single consumer / producer with iteration over a single client. Clearly that iteration over a single websocket client's messages will not work for the case where there are two or more websocket clients. How about something like this?

async def main():
    async for ws1 in websockets.connect(uri=uri1):
        async for ws2 in websockets.connect(uri=uri2):
            try:
                async with asyncio.TaskGroup() as tg:
                    tg.create_task(consumer1(ws1.recv()))
                    tg.create_task(consumer2(ws2.recv()))
            except websockets.ConnectionClosed:
                await asyncio.sleep(0.)
                continue
aaugustin commented 4 months ago

Yes that's the sort of code that you need, but not exactly correct.

The key point here is to two tasks that call recv() in parallel on the two connections. tg.create_task is a good way to do this.

The call to recv() must be inside the tasks. I'd put await ws1.recv() inside the definition of consumer1. (Technically, your code can work if consumer1 awaits its first argument but that's a rather weird way to do it. Also it lets you receive only one massage, while you want to interate.)