LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
688 stars 164 forks source link

Unable to open many websockets at one time #29

Closed MikeHibbert closed 5 years ago

MikeHibbert commented 5 years ago

Hi

I've been trying out this module and I'm aware that there is a limit to how many signs you can have in the URI (documentation says 8004)

I was wondering if adding multiple websockets into the socket manager would be possible?

So far I've been breaking up my symbols into groups of 50 but once I get past creating the first websocket I get errors saying:

ERROR:asyncio:Task was destroyed but it is pending! task: <Task pending coro=<WebSocketCommonProtocol.close_connection() done, defined at /home/mike/Documents/python/***/lib/python3.6/site-packages/websockets/protocol.py:1114> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f11118670d8>()]>>

This is the code that creates the streams:

        MARKETS_SIZE = 50

        market_sections = [self.markets[x:x+MARKETS_SIZE] for x in range(0, len(self.markets), MARKETS_SIZE)]

        self.bm = BinanceWebSocketApiManager(self.process_unicorn_streams) 
        for market_section in market_sections:
            if self.bm.is_websocket_uri_length_valid(self.channels, market_section):
                stream_id = self.bm.create_stream(self.channels, market_section)
                self.unicorn_multi_stream_ids.append(stream_id)

process_unicorn_streams looks like this:

def process_unicorn_streams(self, received_stream_data_json, exchange='binance.com'):
        if exchange == "binance.com":
            unicorn_fied_stream_data = UnicornFy.binance_com_websocket(received_stream_data_json)
        elif exchange == "binance.je":
            unicorn_fied_stream_data = UnicornFy.binance_je_websocket(received_stream_data_json)
        else:
            logger.error("Not a valid exchange: " + str(exchange)) 

        if 'data' in unicorn_fied_stream_data.keys():
            for data in unicorn_fied_stream_data['data']:
                symbol = unicorn_fied_stream_data['data'][0]['symbol']

                coin = self.coins[symbol]

                coin.process_unicorn_message(unicorn_fied_stream_data)  

        else:
            symbol = unicorn_fied_stream_data['symbol']

            coin = self.coins[symbol]

            coin.process_unicorn_message(unicorn_fied_stream_data) 

I'm using version 1.6.3 which I installed initially with pip and later using git+ to get it straight from this repo. I'm using python 3.6.9 on Linux

oliver-zehentleitner commented 5 years ago

Hi Mike!

Please try it with the stream_buffer instead of the callback function!

Use this to create the streams:

        MARKETS_SIZE = 50

        market_sections = [self.markets[x:x+MARKETS_SIZE] for x in range(0, len(self.markets), MARKETS_SIZE)]

        self.bm = BinanceWebSocketApiManager() 
        for market_section in market_sections:
            if self.bm.is_websocket_uri_length_valid(self.channels, market_section):
                stream_id = self.bm.create_stream(self.channels, market_section)
                self.unicorn_multi_stream_ids.append(stream_id)

Does it work now?

There is one problem if you use the callback_function: YOU DONT GET AN ERROR TRACE IF THERE IS A BUG WITHIN THE CALLBACK CODE!!! (See this issue: https://github.com/unicorn-data-analysis/unicorn-binance-websocket-api/issues/18)

MikeHibbert commented 5 years ago

Ok! so I made some alterations and got the same results:

The code:

        self.bm = BinanceWebSocketApiManager()
        for market_section in market_sections:
            if self.bm.is_websocket_uri_length_valid(self.channels, market_section):
                stream_id = self.bm.create_stream(self.channels, market_section)
                self.unicorn_multi_stream_ids.append(stream_id)

                self.bm.wait_till_stream_has_started(stream_id)

        end = arrow.now()
        logger.error("Started binance websockets - created coins taking: {}".format(end - start))

        try:
            while True:
                if self.bm.is_manager_stopping():
                    exit(0)

                old_stream_data = self.bm.pop_stream_data_from_stream_buffer()
                if old_stream_data is False:
                    time.sleep(0.01)
                else:
                    try:
                        self.process_unicorn_streams(old_stream_data)
                    except Exception as e:
                        logger.error(e)
                        self.bm.add_to_stream_buffer(old_stream_data)
                    #print("Websocket status {}".format(self.bm.get_stream_info(stream_id)))

        except Exception as e:
            logger.error(e)
            #for stream_id in self.unicorn_multi_stream_ids:
                #self.bm.stop_stream(stream_id)

            self.bm.stop_manager_with_all_streams()

The error:

ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<WebSocketCommonProtocol.close_connection() done, defined at /home/mike/Documents/python/*******/lib/python3.6/site-packages/websockets/protocol.py:1114> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb87c57a048>()]>>
oliver-zehentleitner commented 5 years ago

i dont know, maybe wrong websockets version? you dont provide the infos i need and asked for...

MikeHibbert commented 5 years ago

Ok, here's my pip freeze ... sorry ... I didnt know you needed to know about the websockets version:

aiodns==1.1.1
aiohttp==3.5.4
amqp==1.4.9
aniso8601==7.0.0
anyjson==0.3.3
arrow==0.14.3
asn1crypto==0.24.0
async-timeout==3.0.1
attrs==19.1.0
autobahn==19.7.2
Automat==0.7.0
backports.functools-lru-cache==1.5
billiard==3.3.0.23
ccxt==1.18.991
celery==3.1.26.post2
certifi==2019.6.16
cffi==1.12.3
chardet==3.0.4
cheroot==6.5.5
Click==7.0
colorama==0.4.1
constantly==15.1.0
cryptography==2.7
dateparser==0.7.1
Django==2.2.3
django-celery==3.3.0
django-cors-headers==3.0.2
django-extensions==2.2.1
django-proxy==1.2.1
django-rest-framework==0.1.0
django-webpush==0.3.1
djangorestframework==3.10.1
djangorestframework-jwt==1.11.0
Flask==1.1.1
Flask-RESTful==0.3.7
gevent==1.4.0
greenlet==0.4.15
gunicorn==19.9.0
http-ece==1.1.0
hyperlink==19.0.0
idna==2.8
idna-ssl==1.1.0
incremental==17.5.0
iso8601==0.1.12
itsdangerous==1.1.0
Jinja2==2.10.1
kombu==3.0.37
MarkupSafe==1.1.1
more-itertools==7.2.0
multidict==4.5.2
numpy==1.17.0
pandas==0.25.0
pathlib==1.0.1
psycopg2==2.8.3
py-vapid==1.7.0
pyasn1==0.4.5
pyasn1-modules==0.2.5
pycares==3.0.0
pycparser==2.19
PyHamcrest==1.9.0
PyJWT==1.7.1
pyOpenSSL==19.0.0
python-binance==0.7.1
python-dateutil==2.8.0
pytz==2019.1
pywebpush==1.9.4
regex==2019.6.8
requests==2.22.0
rfc3339==6.0
scipy==1.3.0
service-identity==18.1.0
six==1.12.0
sqlparse==0.3.0
Twisted==19.2.1
txaio==18.8.1
typing-extensions==3.7.4
tzlocal==2.0.0
unicorn-binance-websocket-api==1.6.3.dev0
unicorn-fy==0.2.0
urllib3==1.25.3
vine==1.3.0
websocket-client==0.56.0
websockets==8.0.1
Werkzeug==0.15.5
yarl==1.1.0
zope.interface==4.6.0
oliver-zehentleitner commented 5 years ago

oh, maybe thats the reason, you are using websockets 8.0.1 - its new. unicorn-binance-websocket-api is build for websockets 7.0

try: pip install websockets==7.0

oliver-zehentleitner commented 5 years ago

I made a new release and linked the package to websockets 7.0 so you can try with: pip install unicorn-binance-websocket-api --upgrade

If its not working then, please check that python is using the 1.6.5 lib not the 1.6.4.dev you have installed from git. You can use https://github.com/unicorn-data-analysis/unicorn-binance-websocket-api/blob/master/tools/get_used_module_version.py for that.

MikeHibbert commented 5 years ago

I've install websockets==7.0 and that doesnt seem to have fixed it I'm afraid

Just to add a bit more detail here are the details of my channels:

self.channels = ['kline_5m',  'kline_15m',  'kline_30m',  'kline_1w', 'ticker', 'depth', 'trade']

I'm basically taking all the coins available on Binance that are not PAX, USDC, ETH or TUSD pairs and creating websockets for them all

I think the list is a few thousand coin pairs. Not sure if that make any difference or that it goes over the limits?

MikeHibbert commented 5 years ago

Just thought it might help if I give you some code that does exactly what I'm trying to achieve

It uses python-binance to get the coinpair symbols and your module for the rest:

from binance.client import Client # from python-binance module
from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import BinanceWebSocketApiManager
from unicorn_fy.unicorn_fy import UnicornFy
import logging

logger = logging.getLogger(__name__)

channels = ['kline_5m',  'kline_15m',  'kline_30m',  'kline_1w', 'ticker', 'depth', 'trade']

def process_unicorn_streams(message):
    logger.info(message)

def get_markets():
    client = Client()

    exchange_info = client.get_exchange_info()

    markets = []
    for coin in exchange_info.get('symbols'):
        if coin.get('quoteAsset') in ['PAX', 'USDC', 'TUSD', 'ETH']:
            continue                

        symbol = "{}{}".format(coin.get('baseAsset'), coin.get('quoteAsset'))
        markets.append(symbol.lower())    

    MARKETS_SIZE = 20

    market_sections = [self.markets[x:x+MARKETS_SIZE] for x in range(0, len(self.markets), MARKETS_SIZE)]

    return market_sections

def create_websockets(market_sections):
    global channels
    unicorn_multi_stream_ids = []

    bm = BinanceWebSocketApiManager()
    for market_section in market_sections:
        if bm.is_websocket_uri_length_valid(channels, market_section):
            stream_id = bm.create_stream(channels, market_section)
            unicorn_multi_stream_ids.append(stream_id)   

    return bm, unicorn_multi_stream_ids

if __name__ == "__MAIN__":
    market_sections = get_markets()

    bm, stream_ids = create_websockets(market_sections)

    try:
        while True:
            if bm.is_manager_stopping():
                exit(0)

            old_stream_data = bm.pop_stream_data_from_stream_buffer()
            if old_stream_data is False:
                time.sleep(0.01)
            else:
                try:
                    process_unicorn_streams(old_stream_data)
                except Exception as e:
                    logger.error(e)
                    bm.add_to_stream_buffer(old_stream_data)
                    #print("Websocket status {}".format(self.bm.get_stream_info(stream_id)))

        except Exception as e:
            logger.error(e)
            #for stream_id in self.unicorn_multi_stream_ids:
                #self.bm.stop_stream(stream_id)

            bm.stop_manager_with_all_streams()    

Hope this helps you duplicate the issue!

oliver-zehentleitner commented 5 years ago

Sorry, i tried to start your script and had several things to change to make it run.

Had to remove "self" from different places, had to change intends to make it run. If i add print(str(market_sections)) then there is nothing printed, its empty

I asked you questions that you did not answer. Did you controll you are using the new version? I had problems with paralel installed libs, its worth to look at it and manualy remove old packages from pip.

I am going to help you ok, but provide me all answers i asked for, i dont want to ask the same question again and again. If you want my help and my time, then you have to prepare the infos i need, other wise it will end in endless ping pong and its a waste of my time.

I stopped changing your script to make it run, thats your part. then i download it, start it and then i can see your problem and find a solution. I think that will be the fastest way for us both!

Its very possible that you are using python2 and not python3 to start the script.

Make me a full running script i can test OR start with an example script of this repository and extend it.

I dont think that there is a limit from Binance, i stream more than 2-3 TB data per month with a lot of streams,.... but start with less websockets or with a sleep between creating them, so you can test if there is a limit...

sean-bytefoundry commented 5 years ago

Hi, I'm having the same problem, so I stripped out all the code that wasn't relevant in order to post it here for you to review, but stripping out all the code that was performing database inserts seems to have helped (of course it doesn't "help" to not be able to insert data, but it reduces the frequency of that error). I still see the odd ERROR:asyncio:Task was destroyed but it is pending! with the stripped down code after it's been running for a while, but what seems to be coming up more frequently is this after it's been running for a couple of minutes. Code is attached.

binance_web_sockets_test.py.zip

Exception in thread Thread-3: Traceback (most recent call last): File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner File "/usr/local/lib/python3.7/threading.py", line 870, in run File "/usr/local/lib/python3.7/site-packages/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py", line 406, in _keepalive_streams File "/usr/local/lib/python3.7/site-packages/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py", line 1685, in restart_stream File "/usr/local/lib/python3.7/asyncio/events.py", line 762, in new_event_loop File "/usr/local/lib/python3.7/asyncio/events.py", line 660, in new_event_loop File "/usr/local/lib/python3.7/asyncio/unix_events.py", line 51, in init File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 55, in init File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 102, in _make_self_pipe File "/usr/local/lib/python3.7/socket.py", line 491, in socketpair OSError: [Errno 24] Too many open files

Exception ignored in: <function BaseEventLoop.del at 0x7f08caeb49e0> Traceback (most recent call last): File "/usr/local/lib/python3.7/asyncio/base_events.py", line 620, in del File "/usr/local/lib/python3.7/asyncio/unix_events.py", line 55, in close File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 86, in close File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 93, in _close_self_pipe AttributeError: '_UnixSelectorEventLoop' object has no attribute '_ssock'

sean-bytefoundry commented 5 years ago

And now this...

ERROR:root:BinanceWebSocketApiConnection->await._conn.aenter(2b1cf8b5-4083-4e52-873d-b30a5f984eab, {'kline_1m', 'kline_5m', 'kline_1w', 'kline_3m', 'kline_4h', 'kline_1d', 'kline_1h', 'kline_8h', 'kline_6h', 'kline_15m', 'kline_3d', 'kline_1M', 'kline_12h', 'kline_2h', 'kline_30m'}, ['hotusdt', 'zilusdt', 'zrxbnb']) - OSError - [Errno -2] Name or service not known ERROR:asyncio:Task was destroyed but it is pending! task: <Task pending coro=<WebSocketCommonProtocol.close_connection() done, defined at /usr/local/lib/python3.7/site-packages/websockets/protocol.py:1002> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f368408c450>()]>>

oliver-zehentleitner commented 5 years ago

hi sean!

the errors looks very different to me!

please read https://github.com/unicorn-data-analysis/unicorn-binance-websocket-api/wiki/Issue-Guidelines

and open a new bug report, i need the infos which you should provide when opening an issue to understand your environment and your problem.

best regards, Oliver

oliver-zehentleitner commented 5 years ago

oh, please post the code here not as a zip download, i dont want to download files to my system.

post them as text or post a link to your repository please. thats safer for me.

oliver-zehentleitner commented 5 years ago

Did you guys find the reason why it didnt work? Can we close this issue?