LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
683 stars 164 forks source link

InvalidStateError and Limits wrong #105

Closed h5tec closed 3 years ago

h5tec commented 4 years ago

Hi, I have still the asyncio.base_futures' has no attribute 'InvalidStateError' issue in Version 1.16.7 using python 3.8.5. As I can see the data stream is hanging every second as well.

I don't use the buffer as i use own function:

process_stream_data = self.process_stream.

Any stream received by this function is forwarded to to an async function like:

if sid == self.ticker_sid : asyncio.create_task(self.sort_ticker(msg))

I also mentioned an issue with number of subscriptions, as may someone got it wrong by Binance docu. Binance say 1024 Streams per Connection and NOT 1024 subscribtion to a Stream. As I found out the limit of subscribtions per stream is rd. 684.

I use the ticker to get 24h volumes from pairs and only add pairs to subscriptions when volume is greater than {'btc':1000,'eth':1000,'bnb':10000,'usdt':10000000}. Therefore, I have rd. 135 pairs adding to streams and must use any channel in extra stream as all in one multistream not work, because any channels with 135 pairs is more than 684 and stream will break.

I started writing own async websocket client for Binance using tornado framework, After 3 days I found your project and this save lot of time for me. Many thanks for your great work. For now you deal with threads and i thought to wrap everything in asyncio. Is there any reason why you prefer threads?

MichaQ

oliver-zehentleitner commented 4 years ago

I really really recommend to use the stream buffer! With the call back function you are running into problems very easy and cant really trace them... you get blind to errors within the self.process_stream. (I dont know why, but had that problems and others too)

stream_buffer is much more stable, you receive and put the data to stream buffer and the loop starts again. thats it. Its not depending on the speed of any other resource like mysql or what ever.

But: If you use the process class, then think to this: the process class you provide is started after every await receive(), which already is triggered asynchronous. but then, there is no need for more async code i think.

Do you receive the InvalidStateError also with the Streambuffer? Where excatly does it happen? Do you have a tracelog or log entry?

I work with threads AND asyncio.

I create a coroutine for every single stream/websocket connection within its own thread... Otherwise create_stream() would be a blocking function for example

oliver-zehentleitner commented 4 years ago

As I found out the limit of subscribtions per stream is rd. 684.

I have streams with 800 subscriptions and more... why do you think this is the limit? how did you determine this?

h5tec commented 4 years ago

Hi, thanks for your quick answer. I just figured out, all the problems raised up by not using the excellent stream buffer. I admit to your advise to use stream buffer. However, I still use async function but in other way. As I can use Async Tornado Framework to build periodic callbacks instead of a while True: loop.

from time import time as ts
from tornado.ioloop import IOLoop, PeriodicCallback
from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import BinanceWebSocketApiManager

class BinanceBot(object):

    def __init__(self,exchange='binance.com'):
        self.stream = BinanceWebSocketApiManager(exchange = self.exchange)
        PeriodicCallback(self.process_buffer,10).start()

    async def process_buffer(self,debug=None):

        try:

            start = ts()

            if debug is None: debug = self.debug

            if self.stream:

                buffer = self.stream.pop_stream_data_from_stream_buffer()

                if buffer:

                    try: data = json.loads(buffer)
                    except:
                        if self.debug: self.log(30,'unkown data in stream buffer','stream')
                        return

                    if   isinstance(data, list):
                        if 'e' in data[0]:
                            if data[0]['e'] == '24hrTicker': asyncio.create_task(self.sort_ticker(data))
                        else: print('Unkown List',type(data),data)
                    elif isinstance(data, dict):
                        if   'result' in data: return
                        elif 'error'  in data:
                            if debug: self.log(40,'Error code {} in Stream {} : {}!'.format(data['error']['code'],data['id'],data['error']['msg']),'buffer')
                        elif 'stream' in data:
                            stream = data['stream'].split('@')[1].lower()
                            if    stream == 'depth':      asyncio.create_task(self.sort_obooks(data['data']))
                            elif  stream == 'bookticker': asyncio.create_task(self.sort_prices(data['data']))
                            elif  stream == 'trade':      asyncio.create_task(self.sort_trades(data['data']))
                            elif  stream == 'aggtrade':   asyncio.create_task(self.sort_trades(data['data']))
                            elif  'kline' in stream:      asyncio.create_task(self.sort_klines(data['data']))
                            else: print('Unkown Stream:',stream)
                        else: print('Unkown Dict',type(data),data)
                    else: print('Unkown Data',type(data),data)

                await self.ptrecord(inspect.stack()[0][3].strip('<>'),start)

        except Exception as e: self.log(30,'{} : {}'.format(inspect.stack()[0][3].strip('<>'),e),'buffer')

    async def sort_ticker(self,data,debug=None): pass # do what you want with
    async def sort_prices(self,data,debug=None): pass # do what you want with
    async def sort_obooks(self,data,debug=None): pass # do what you want with
    async def sort_trades(self,data,debug=None): pass # do what you want with
    async def sort_klines(self,data,debug=None): pass # do what you want with

async def init():
    print('App started')
    bot=BinanceBot()

IOLoop.current().add_callback(init)
IOLoop.current().start()

ptrecord is just a function to record process times in pandas DataFrame

PeriodicCallback works very well, as just run once. If async function still runs, new PeriodicCallback skipped. So just when finished it starts after 10ms again. Much Much better than any other Loop Solutions.

And of course, all issues disappeared. Looks really stable now, Many thanks

The subcription limit of 684 just raised up by error message [CODE:4]: To many Subscriptions which was received from server.

Have a nice day

MichaQ

h5tec commented 4 years ago
2020/09/04 13:36:55 DEBUG    STREAM   DEV          BINANCE Stream apikey and secret set
2020/09/04 13:36:55 DEBUG    STREAM   DEV          BINANCE Stream connected
2020/09/04 13:36:55 DEBUG    STREAM   DEV          Ticker Stream created
2020/09/04 13:36:58 DEBUG    TICKER   DEV          [now:122] found 122 new pairs 
2020/09/04 13:36:58 DEBUG    TICKER   DEV          [now:126] found new pair ALGOUSDT YFIBTC KEYETH SRMBNB 
2020/09/04 13:36:59 DEBUG    TICKER   DEV          [now:128] found new pair YFIIBNB BNTETH 
2020/09/04 13:37:01 DEBUG    TICKER   DEV          [now:130] found new pair SANDBNB XRPBNB 
2020/09/04 13:37:10 DEBUG    STREAM   DEV          BookTicker Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          OrderBook Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Trade Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          AggTrade Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_1m Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_3m Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_5m Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_15m Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_30m Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_1h Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_2h Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_4h Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_8h Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_12h Stream created
2020/09/04 13:37:10 DEBUG    STREAM   DEV          Kline_1d Stream created
2020/09/04 13:37:11 DEBUG    STREAM   DEV          Kline_3d Stream created
2020/09/04 13:37:11 DEBUG    STREAM   DEV          Kline_1w Stream created
2020/09/04 13:37:11 DEBUG    STREAM   DEV          Kline_1M Stream created
2020/09/04 13:37:55 DEBUG    INFO     DEV          19 streams, 130 active pairs, 130 pricepairs, 8 ptimes
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of create_stream    > min:1ms    max:119ms  avg:8ms    last:1ms   
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of sort_ticker      > min:38ms   max:699ms  avg:83ms   last:46ms  
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of ensure_streams   > min:222µs  max:177ms  avg:83ms   last:92ms  
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of sort_klines      > min:212µs  max:2ms    avg:515µs  last:410µs 
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of sort_trades      > min:212µs  max:2ms    avg:438µs  last:777µs 
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of sort_prices      > min:575µs  max:6ms    avg:1ms    last:603µs 
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of update_stream    > min:609µs  max:3ms    avg:949µs  last:621µs 
2020/09/04 13:37:55 DEBUG    PTIMES   DEV          process time of process_buffer   > min:267µs  max:4ms    avg:536µs  last:271µs 
h5tec commented 4 years ago

Just got error again

2020-09-04 15:39:08,143 --- CRITICAL --- PROCESS: 12979 --- THREAD: 140378210543360] --- MODULE: unicorn_binance_websocket_api_socket>
BinanceWebSocketApiSocket->start_socket(11ebfad5-2738-41e7-88fa-4eb31b731ba7, ['depth@100ms'], ['rlceth', 'zeceth', 'xrpbnb', 'ontbnb', 'yfibtc', 'atombnb', 'knceth', 'linkdownusdt', 'adaeth', 'batusdt', 'neousdt', 'dotbnb', 'sushibtc', 'bnbbtc', 'sxpusdt', 'veteth', 'qtumusdt', 'solbnb', 'xtzusdt', 'compusdt', 'kavausdt', 'etcusdt', 'bchbtc', 'eosbnb', 'xmrbnb', 'ksmbnb', 'bandbnb', 'eosusdt', 'renusdt', 'ltcusdt', 'omgeth', 'winbnb', 'adabnb', 'qtumbtc', 'omgbtc', 'bchusdt', 'yfiiusdt', 'egldbnb', 'bnteth', 'yfiibtc', 'omgusdt', 'egldusdt', 'dotusdt', 'vetbnb', 'adausdt', 'crvbnb', 'bchbnb', 'trxbtc', 'linkbtc', 'onteth', 'sandbnb', 'neoeth', 'xmrusdt', 'xrpbtc', 'atombtc', 'dashusdt', 'bandusdt', 'srmbnb', 'avabnb', 'btcusdt', 'keyeth', 'bttusdt', 'jstusdt', 'linkusdt', 'lendbtc', 'algousdt', 'linketh', 'lendeth', 'yfibnb', 'xrpeth', 'bttbnb', 'xemeth', 'ontusdt', 'zileth', 'bnbeth', 'adabtc', 'srmbtc', 'dgbbnb', 'yfiibnb', 'etcbnb', 'atomusdt', 'vetusdt', 'jstbtc', 'algobnb', 'trxbnb', 'vetbtc', 'bandbtc', 'ltceth', 'cotibnb', 'bnbusdt', 'iostusdt', 'xlmusdt', 'dotbtc', 'srmusdt', 'ethbtc', 'sxpbnb', 'trxeth', 'jstbnb', 'wavesusdt', 'lrceth', 'linkupusdt', 'ltcbnb', 'sushiusdt', 'runebnb', 'qtumeth', 'zecbnb', 'xtzbnb', 'xtzbtc', 'xembtc', 'yfiusdt', 'trxusdt', 'ethusdt', 'crvusdt', 'xrpusdt', 'eoseth', 'ltcbtc', 'batbnb', 'etceth', 'diabnb', 'lendusdt', 'crvbtc', 'ftmbnb', 'maticbnb', 'eosbtc', 'kavabnb', 'sxpbtc', 'sushibnb', 'neobnb', 'zecusdt', 'fetbnb']) Exception AttributeError Info: module 'asyncio.base_futures' has no attribute 'InvalidStateError'

2020-09-04 15:39:08,144 --- CRITICAL --- PROCESS: 12979 --- THREAD: 140378210543360] --- MODULE: unicorn_binance_websocket_api_manager>
BinanceWebSocketApiManager->stream_is_crashing(11ebfad5-2738-41e7-88fa-4eb31b731ba7)

2020-09-04 15:39:08,349 --- CRITICAL --- PROCESS: 12979 --- THREAD: 140378227328768] --- MODULE: unicorn_binance_websocket_api_socket>
BinanceWebSocketApiSocket->start_socket(c52c8dce-cdb6-48c5-b551-07d649b449d4, ['bookTicker'], ['rlceth', 'zeceth', 'xrpbnb', 'ontbnb', 'yfibtc', 'atombnb', 'knceth', 'linkdownusdt', 'adaeth', 'batusdt', 'neousdt', 'dotbnb', 'sushibtc', 'bnbbtc', 'sxpusdt', 'veteth', 'qtumusdt', 'solbnb', 'xtzusdt', 'compusdt', 'kavausdt', 'etcusdt', 'bchbtc', 'eosbnb', 'xmrbnb', 'ksmbnb', 'bandbnb', 'eosusdt', 'renusdt', 'ltcusdt', 'omgeth', 'winbnb', 'adabnb', 'qtumbtc', 'omgbtc', 'bchusdt', 'yfiiusdt', 'egldbnb', 'bnteth', 'yfiibtc', 'omgusdt', 'egldusdt', 'dotusdt', 'vetbnb', 'adausdt', 'crvbnb', 'bchbnb', 'trxbtc', 'linkbtc', 'onteth', 'sandbnb', 'neoeth', 'xmrusdt', 'xrpbtc', 'atombtc', 'dashusdt', 'bandusdt', 'srmbnb', 'avabnb', 'btcusdt', 'keyeth', 'bttusdt', 'jstusdt', 'linkusdt', 'lendbtc', 'algousdt', 'linketh', 'lendeth', 'yfibnb', 'xrpeth', 'bttbnb', 'xemeth', 'ontusdt', 'zileth', 'bnbeth', 'adabtc', 'srmbtc', 'dgbbnb', 'yfiibnb', 'etcbnb', 'atomusdt', 'vetusdt', 'jstbtc', 'algobnb', 'trxbnb', 'vetbtc', 'bandbtc', 'ltceth', 'cotibnb', 'bnbusdt', 'iostusdt', 'xlmusdt', 'dotbtc', 'srmusdt', 'ethbtc', 'sxpbnb', 'trxeth', 'jstbnb', 'wavesusdt', 'lrceth', 'linkupusdt', 'ltcbnb', 'sushiusdt', 'runebnb', 'qtumeth', 'zecbnb', 'xtzbnb', 'xtzbtc', 'xembtc', 'yfiusdt', 'trxusdt', 'ethusdt', 'crvusdt', 'xrpusdt', 'eoseth', 'ltcbtc', 'batbnb', 'etceth', 'diabnb', 'lendusdt', 'crvbtc', 'ftmbnb', 'maticbnb', 'eosbtc', 'kavabnb', 'sxpbtc', 'sushibnb', 'neobnb', 'zecusdt', 'fetbnb']) Exception AttributeError Info: module 'asyncio.base_futures' has no attribute 'InvalidStateError'

2020-09-04 15:39:08,349 --- CRITICAL --- PROCESS: 12979 --- THREAD: 140378227328768] --- MODULE: unicorn_binance_websocket_api_manager>
BinanceWebSocketApiManager->stream_is_crashing(c52c8dce-cdb6-48c5-b551-07d649b449d4)
h5tec commented 4 years ago

This is the way my logger works:

import logging
from logging import getLogger
from logging.handlers import RotatingFileHandler

log_handler   = RotatingFileHandler('/var/log/binancebot.log', mode='a', maxBytes=10*1024*1024, backupCount=3, encoding="utf-8", delay=0)
log_formatter = logging.Formatter("{asctime} --- {levelname:8} --- PROCESS: {process} --- THREAD: {thread}] --- MODULE: {module}>\n{message}\n",style="{")
#sys.stdout    = log_handler
#sys.stderr    = log_handler
applog        = logging.getLogger()
log_handler.setFormatter(log_formatter)
log_handler.setLevel(logging.WARNING)
applog.setLevel(logging.WARNING)
applog.addHandler(log_handler)
oliver-zehentleitner commented 4 years ago

Look, if the stream crashes for any reason and we can catch that and restart the stream, i think thats fine.

I think the asyncio invalidstate error is cached, does the restart on your system work?

A reason for invalid state error can also be missing system resources, please monitor your system!

oliver-zehentleitner commented 3 years ago

I will test the limit, official is 1024... Thanks for the hint!

oliver-zehentleitner commented 3 years ago

I had not problems with 1024 streams during a 4 day test.

InvalidStateError: https://github.com/oliver-zehentleitner/unicorn-binance-websocket-api/issues/110