python-trio / trio-websocket

WebSocket client and server implementation for Python Trio
MIT License
70 stars 25 forks source link

Sending `ping` conflicts with `send_message` #163

Closed p-i- closed 3 years ago

p-i- commented 3 years ago

The following code sends a message upon connect. It listens for messages, while also sending a ping at regular intervals to keep the socket alive.

Adding the keepalive causes it to crash.

import json
import gzip

import trio
from trio_websocket import open_websocket_url

import threading

from time import sleep

# import traceback

# from utils import tprint

class WebSocket:
    KEEPALIVE_INTERVAL_S = 10

    def __init__(self, ws_url, on_connect, on_msg):
        self.ws_url = ws_url
        self.on_connect = on_connect
        self.on_msg = on_msg

    async def run(self):
        while True:
            try:
                async with open_websocket_url(self.ws_url) as ws:
                    print(f'{self.ws_url} connected')

                    self.ws = ws

                    async with trio.open_nursery() as nursery:
                        async def keepalive():
                            while True:
                                print('sending ping')
                                await ws.ping()
                                await trio.sleep(WebSocket.KEEPALIVE_INTERVAL_S)

                        async def recv():
                            while True:
                                msg = await ws.get_message()

                                if isinstance(msg, bytes):
                                    try:
                                        msg = msg.decode('utf8')
                                    except:
                                        print('DECODE FAIL')
                                        print(msg)
                                print('GOT MESSAGE', msg)

                                return json.loads(msg)

                        nursery.start_soon(self.on_connect, ws)
                        nursery.start_soon(recv)
                        nursery.start_soon(keepalive)

            except Exception as e:
                print('⛔️ WebSocket disconnect')
                await trio.sleep(5)
                print('Attempting reconnect')

        print('👋 from trio_main')

WS_URL = 'wss://ws.hotbit.io'

if __name__ == '__main__':

    async def trio_main():
        async def on_connect(ws):
            print('sendingJSON')
            await ws.send_message(
                json.dumps({
                    'method': 'deals.subscribe',
                    'params': ['ETHUSDT','BTCUSDT'],
                    'id': 123
                })
            )

        async def on_msg(msg):
            print('GOT MSG', msg)

        ws = WebSocket(WS_URL, on_connect, on_msg)

        print('Running')
        await ws.run()

        print('👋 from trio_main')

    def trio_threadfunc():
        trio.run(trio_main)
        print('👋 from trio_threadfunc')

    trio_thread = threading.Thread(
        name = 'aio_thread',
        target = trio_threadfunc,
        daemon = True,
        # args = (foo,)
        )

    trio_thread.start()

    print('Entering mainloop')
    while True:
        print('mainTick')
        sleep(1)

Here's the crashdump:

> ./WebSocket_Trio.py 
Entering mainloop
mainTick
Running
 2021-06-12 17:55:47,539             _impl.py:169    connect_websocket() Connecting to wss://ws.hotbit.io:443/
 2021-06-12 17:55:47,563             _impl.py:1251                _send() client-0 sending 151 bytes
 2021-06-12 17:55:48,129             _impl.py:1227         _reader_task() client-0 received 516 bytes
 2021-06-12 17:55:48,130             _impl.py:1204         _reader_task() client-0 received event: <class 'wsproto.events.AcceptConnection'>
 2021-06-12 17:55:48,131    WebSocket_Trio.py: 42                  run() wss://ws.hotbit.io connected
sendingJSON
sending ping
 2021-06-12 17:55:48,132             _impl.py:1251                _send() client-0 sending 80 bytes
 2021-06-12 17:55:48,133             _impl.py:1251                _send() client-0 sending 10 bytes
 2021-06-12 17:55:48,247             _impl.py:1227         _reader_task() client-0 received 76 bytes
 2021-06-12 17:55:48,248             _impl.py:1204         _reader_task() client-0 received event: <class 'wsproto.events.BytesMessage'>
wtf
b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xabVJ-*\xca/R\xb2R\xc8+\xcd\xc9\xd1QP*J-.\xcd)\x01\xf2\xab\x95\x8aK\x12KJ\x8b\x81L\xa5\xe2\xd2\xe4\xe4\xd4\xe2b\xa5Z\xa0\x82\xcc\x14\xa0\x88\xa1\x91q-\x00k\x14\xb2(;\x00\x00\x00'
GOT MESSAGE b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xabVJ-*\xca/R\xb2R\xc8+\xcd\xc9\xd1QP*J-.\xcd)\x01\xf2\xab\x95\x8aK\x12KJ\x8b\x81L\xa5\xe2\xd2\xe4\xe4\xd4\xe2b\xa5Z\xa0\x82\xcc\x14\xa0\x88\xa1\x91q-\x00k\x14\xb2(;\x00\x00\x00'
 2021-06-12 17:55:48,250             _impl.py:1227         _reader_task() client-0 received 1981 bytes
 2021-06-12 17:55:48,250             _impl.py:1204         _reader_task() client-0 received event: <class 'wsproto.events.BytesMessage'>
 2021-06-12 17:55:48,250             _impl.py:1251                _send() client-0 sending 8 bytes
 2021-06-12 17:55:48,251             _impl.py:1227         _reader_task() client-0 received 1962 bytes
 2021-06-12 17:55:48,251             _impl.py:1204         _reader_task() client-0 received event: <class 'wsproto.events.BytesMessage'>
 2021-06-12 17:55:48,255             _impl.py:1227         _reader_task() client-0 received 2 bytes
 2021-06-12 17:55:48,255             _impl.py:1204         _reader_task() client-0 received event: <class 'wsproto.events.Pong'>
 2021-06-12 17:55:48,365             _impl.py:1221         _reader_task() client-0 received zero bytes (connection closed)
 2021-06-12 17:55:48,366             _impl.py:1019    _close_web_socket() client-0 websocket closed ConnectionClosed<CloseReason<code=1006, name=ABNORMAL_CLOSURE, reason=None>>
 2021-06-12 17:55:48,366             _impl.py:1237         _reader_task() client-0 reader task finished
⛔️ WebSocket disconnect
mainTick

Simply removing the on_connect message fixes:

        async def on_connect(ws):
            print('sendingJSON')
            # await ws.send_message(
            #     json.dumps({
            #         'method': 'deals.subscribe',
            #         'params': ['ETHUSDT','BTCUSDT'],
            #         'id': 123
            #     })
            # )

Note that get_message isn't picking up the pong response.

So somehow send_message and ping are interfering with one another.

I thought it might be a timing issue, that both sends are happening at the same time. If that was the case this would resolve it:

                                # reverse the order, so sleep BEFORE pinging
                                await trio.sleep(WebSocket.KEEPALIVE_INTERVAL_S)
                                await ws.ping()

But it doesn't.

Is this a known issue? Or a PEBCAK my end...

andersea commented 3 years ago

It isn't crashing. The remote server just doesn't like you or what you are doing to it, and is closing the connection.

 2021-06-12 17:55:48,365             _impl.py:1221         _reader_task() client-0 received zero bytes (connection closed)
 2021-06-12 17:55:48,366             _impl.py:1019    _close_web_socket() client-0 websocket closed ConnectionClosed<CloseReason<code=1006, name=ABNORMAL_CLOSURE, reason=None>>
p-i- commented 3 years ago

My bad, I was failing to gzip-decompress the payload. Nothing to do with ping.