sammchardy / python-binance

Binance Exchange API python implementation for automated trading
https://python-binance.readthedocs.io/en/latest/
MIT License
6.01k stars 2.2k forks source link

Async websocket queue suddenly increases and overflow #1295

Open stephanerey opened 1 year ago

stephanerey commented 1 year ago

Describe the bug Running a very simple async routine for connecting the trade socket (or any) and monitoring the receive queue length runs correctly with queue length at 0 for a while and suddenly the queue length increases and overflow (at 100) which lead to socket close

*To Reproduce**

import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QLabel
from PyQt5.QtCore import pyqtSignal, QObject
import asyncio
import asyncqt
from binance import AsyncClient, BinanceSocketManager

class BinanceWebSocket(QObject):
    new_data = pyqtSignal(dict)

    async def connect_to_binance_websocket(self, symbol):
        client = await AsyncClient.create()
        bsm = BinanceSocketManager(client)
        socket = bsm.trade_socket(symbol)
        async with socket as trade_stream:
            while True:
                res = await trade_stream.recv()
                print('queue lenghth :', trade_stream._queue.qsize())
                self.new_data.emit(res)

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.label = QLabel(self)
        self.label.setGeometry(50, 50, 500, 30)

        self.binance_websocket = BinanceWebSocket()
        self.binance_websocket.new_data.connect(self.handle_new_data)

        loop = asyncio.get_event_loop()
        loop.create_task(self.binance_websocket.connect_to_binance_websocket('btcusdt'))

    def handle_new_data(self, data):
        self.label.setText(str(data))
        if ('e' in data):
            if (data['m'] == 'Queue overflow. Message not filled'):
                print("Socket queue full. Resetting connection.")
                # self.reset_socket()
                return
            else:
                if data['m'] is not True and data['m'] is not False:
                    print(f"Stream error: {data['m']}")
                    # exit(1)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    loop = asyncqt.QEventLoop(app)
    asyncio.set_event_loop(loop)
    mainWindow = MainWindow()
    mainWindow.show()
    with loop:
        loop.run_forever()
    sys.exit(app.exec_())

Expected behavior Expecting not happening and/or having a flow control enabling the client to ask the API stop sending until the client is not ready.

Environment (please complete the following information):

Logs or Additional context Add any other context about the problem here.

Karlheinzniebuhr commented 7 months ago

any solution to this yet ?