python-websockets / websockets

Library for building WebSocket servers and clients in Python
https://websockets.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
5.23k stars 519 forks source link

Running Scheduled Task in Websokcets Or Scaling with multiple workers #1467

Closed suryanarayanan035 closed 3 months ago

suryanarayanan035 commented 6 months ago

Scenario

Hello there, Thanks for creating this cool and elegant library. I am building a chat application using this library. In my application I want to ping the client for every 10 seconds to verify whether they are connected still. But the issue is I cannot do so.

What I have tried

I tried to create an async function which will sleep for 10 seconds then call websockets.ping method. But this can't be used because if I use asyncio.sleep then I need to use await which will block the codes below the function call.

Expectation

So, I thought of moving the pinging work to a separate worker. So my main server, will handle the connections and my worker will do the pinging. To do so, I have to serialize/pickle the websocket object which is not possible.

So, I would like to know a way to scale websockets with multiple worker or running interval based tasks without blocking the main process.

Here is my server code:

async def handler(websocket):
    user = save_user_data_on_connect(websocket)
    CURRENT_CONNECTIONS[user.get_id()] = websocket
    status, pair = pair_user(user)
    if status == 'paired':
        message = json.dumps(construct_message_dict('connected'))
        await CURRENT_CONNECTIONS[pair].send(message)
        await websocket.send(message)
*** this is where the logic for start the scheduled job will come***
        logging.info(f"connected msg sent to user {websocket.id}")
    async for message in websocket:
        receiver = get_user_pair(str(websocket.id))
        try:
            print(f"Message Received: {message}")
            message = parse_message(message)
            message['type'] = 'user_sent'
            if receiver and 'content' in message:
                message = save_message_to_db(str(websocket.id), receiver, message)
                await CURRENT_CONNECTIONS[receiver].send(message)
        except websockets.exceptions.ConnectionClosed:
            cleanup_disconnected_connection(receiver)
            message = json.dumps(construct_message_dict('disconnected'))
            await CURRENT_CONNECTIONS[str(websocket.id)].send(message)
        except Exception as e:
            print(e)

Happy to provide more information if needed. Thanks in adavance!

aaugustin commented 6 months ago

So, I thought of moving the pinging work to a separate worker. So my main server, will handle the connections and my worker will do the pinging. To do so, I have to serialize/pickle the websocket object which is not possible.

"worker" in this context is "asyncio coroutine". Coroutines share the same memory space. Therefore, you don't need to serialize/pickle.

(Besides, you cannot (easily) share sockets between processes. Therefore, generally speaking, serialize/pickle a network connection with the intent of sharing it never makes sense. If it's the same process, you don't need to serialize. If it's in a different process, you cannot share.)

Are you aware the websockets already does this for you and you can simply tune the parameters as you wish?

e.g. serve(ping_interval=10, ping_timeout=5) if you think that latency > 5s means the connection is dead