tarasko / picows

Ultra-fast websocket client and server for asyncio
MIT License
135 stars 8 forks source link

Does picows have a heartbeat mechanism? #2

Closed River-Shi closed 2 months ago

River-Shi commented 2 months ago

Some servers, like the OKX WebSocket server, require clients to send a ping message every 30 seconds:

To maintain a stable connection:

  1. Set a timer for N seconds upon receiving a response, where N is less than 30.
  2. If the timer expires without receiving a new message, send the string 'ping'.
  3. Expect a 'pong' reply. If no response is received within N seconds, raise an error or reconnect.

Similarly, the Bybit WebSocket server recommends sending a ping heartbeat every 20 seconds to prevent network or program issues.

How can this be achieved using picows? I previously used websockets, which handled this automatically. I would really appreciate it if you could provide an example of how to implement this mechanism.

tarasko commented 2 months ago

Picows at the moment provides a low-level interface and doesn't have "high-level" features like auto ping-pong mechanism.

The easiest would be to create a task that sends exchange specific ping in a loop and check whether pong was received on the next iteration.

This can be improved (if necessary) to take into account that the last message was received less than N seconds ago. But for detecting stale connection, imho, even this simple approach is good enough.

Contrary to aiohttp and probably websockets (I haven't used it) you have full control over ping-pong exchange. You can log and troubleshoot disconnects more easily. It least this was the case for me. I sometimes didn't understand why aiohttp was reporting disconnects.

class OKXListener(WSListener):
    _transport: WSTransport
    _ping_loop_task: asyncio.Task
    _ping_delay: float
    _waiting_for_pong: bool

    def on_ws_connected(self, transport: WSTransport):
        self._transport = transport
        self._ping_loop_task = asyncio.create_task(self._ping_loop())
        self._ping_delay = 20
        self._waiting_for_pong = False

    def on_ws_frame(self, transport: WSTransport, frame: WSFrame):
        if frame.msg_type == WSMsgType.PING:
            self._transport.send_pong(frame.get_payload_as_bytes())
            return

        if self.is_exchange_specific_pong(frame):
            self._waiting_for_pong = True
            return

        # Process message
        # ...

    def on_ws_disconnected(self, transport: WSTransport):
        self._ping_loop_task.cancel()

    async def _ping_loop(self):
        while True:
            await asyncio.sleep(self._ping_delay)

            if self._waiting_for_pong:
                self._transport.send_close(WSCloseCode.GOING_AWAY, b"no pong received during required interval")
                self._transport.disconnect()
                return
            else:
                self.send_exchange_specific_ping()
                self._waiting_for_pong = True

    def send_exchange_specific_ping(self):
        # OKX uses non-standard ping-pong mechanism
        self._transport.send(WSMsgType.TEXT, b"ping")

        # In other cases just send a standard PING message
        # self._transport.send_ping()

    def is_exchange_specific_pong(self, frame: WSFrame) -> bool:
        # For OKX
        if frame.msg_type == WSMsgType.TEXT and frame.get_payload_as_memoryview() == b"pong":
            return True

        # For websockets that are using standard PING-PONG control messages
        # if frame.msg_type == WSMsgType.PONG:
        #     return True

        return False

I haven't run or tested this code.