LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
683 stars 165 forks source link

Exception ConnectionClosed - error_msg: sent 1011 (unexpected error) keepalive ping timeout; no close frame received BinanceWebSocketApiManager.stream_is_crashing #284

Closed jmy48 closed 1 year ago

jmy48 commented 2 years ago

Version of this library.

unicorn_fy: 0.12.2 unicorn_binance_local_depth_cache: not found unicorn_binance_rest_api: not found unicorn_binance_trailing_stop_loss: not found unicorn_binance_websocket_api: 1.41.0

Solution to Issue cannot be found in the documentation or other Issues and also occurs in the latest version of this library.

Hardware?

Local server/workstation

Operating System?

macOS

Python version?

Python3.10

Installed packages

No response

Logging output

No response

Processing method?

process_stream_data

Used endpoint?

binance.com

Issue

BinanceWebSocketApiSocket.start_socket(9838d3bba33c-e3a3-8a3e-b38a-bb5fe9d6, ['trade'], ['kncusdt',...] Exception ConnectionClosed - error_msg: sent 1011 (unexpected error) keepalive ping timeout; no close frame received BinanceWebSocketApiManager.stream_is_crashing(29ed4cffd186-0f68-1ceb-b5a8-5153f96d)

after running for ~20 minutes with 206 symbols. The program doesn't stop, however. What does it mean? Is it dropping data?

rafalsza commented 2 years ago

have the same issue, my code:

import pandas as pd
import time
import threading
import datetime as dt
from binance.client import Client
from sqlalchemy import create_engine

def import_hist_data(coins, interval, start_str, end_str):
    for coin in coins:
        client = Client()
        df = pd.DataFrame(client.get_historical_klines(coin, interval, start_str, end_str)).astype(float)
        print(f"Fetching new bars for {coin} {interval} {dt.datetime.now().isoformat()}")
        df = df.iloc[:, :6]
        df.columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
        # df = df.set_index('timestamp')
        # df.index = pd.to_datetime(df.index, unit='ms')
        df.timestamp = pd.to_datetime(df.timestamp, unit='ms')
        df.to_sql(coin, engine, index=False, if_exists='replace')

def SQLimport(data):
    if data['kline']['is_closed']:
        timestamp = data['event_time']
        coin = data['symbol']
        open = data['kline']['open_price']
        high = data['kline']['high_price']
        low = data['kline']['low_price']
        close = data['kline']['close_price']
        volume = data['kline']['base_volume']
        frame = pd.DataFrame([[timestamp, open, high, low, close, volume]],
                             columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']).astype(float)
        frame.timestamp = pd.to_datetime(frame.timestamp, unit='ms')
        frame.to_sql(coin, engine, index=False, if_exists='append')

def stream_data_from_stream_buffer(binance_websocket_api_manager):
    while True:
        if binance_websocket_api_manager.is_manager_stopping():
            exit(0)
        data = binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
        if data is False:
            time.sleep(0.01)
        else:
            try:
                if len(data) > 3:
                    SQLimport(data)
            except Exception:
                # not able to process the data? write it back to the stream_buffer
                binance_websocket_api_manager.add_to_stream_buffer(data)
            except KeyboardInterrupt:
                print("\nStopping ... just wait a few seconds!")
                bwam.stop_manager_with_all_streams()

if __name__ == '__main__':
    engine = create_engine('postgresql://postgres:postgres@xx.xx.xx.xxx:5432/cryptolive_1h')
    bwam = BinanceWebSocketApiManager(exchange="binance.com")

    interval = '1h'
    # start_str = '30 days ago UTC'
    start_str = '18 months ago UTC'
    end_str = f'{dt.datetime.now()}'

    symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'XRPUSDT', 'ADAUSDT', 'SOLUSDT', 'DOTUSDT', 'DOGEUSDT', 'AVAXUSDT',
               'TRXUSDT', 'LTCUSDT']

    import_hist_data(symbols, interval, start_str, end_str)

    worker_thread = threading.Thread(target=stream_data_from_stream_buffer, args=(bwam,))
    worker_thread.start()

    kline_stream_id = bwam.create_stream(['kline_1h'], [i.lower() for i in symbols], output='UnicornFy')
    # try:
    #     while True:
    #         time.sleep(60)
    # except KeyboardInterrupt:
    #     print("\nStopping ... just wait a few seconds!")
    #     bwam.stop_manager_with_all_streams()
lukaszbinden commented 2 years ago

First of all, thanks for sharing this great and useful library! it is nicely designed.

Unfortunately, I am experiencing the same issue as described above. IMO the issue's occurrence seems OS dependent. When started on Ubuntu 20.04, the error "sent 1011 (unexpected error) keepalive ping timeout; no close frame received" occurs within 5 mins for the first time and continues to repeat (tested multiple times). However, when I run the same code on Windows 10 Pro. the error does not show up within 10 minutes (not tested extensively though). Tried with parameter ping_timeout_default=10 and ping_timeout_default=50, but seemed to have no effect.

Version of this library. unicorn-binance-websocket-api 1.41.0 unicorn-fy 0.11.1

Hardware? Virtual private server

Operating System? Ubuntu 20.04 (64 Bit) / Linux 5.4.0-65-generic #73-Ubuntu SMP Mon Jan 18 17:25:17 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

Python version? Python 3.7.11

Logging output 2022-08-10 08:17:00,367 | CRITICAL | _create_stream_thread: stream_id=40a0ae1a72eb-8058-a62f-46f2-dcba4082, time=1660112193.3642814 | BinanceWebSocketApiSocket.start_socket(40a0ae1a72eb-8058-a62f-46f2-dcba4082, ['arr'], ['!bookTicker']) - Exception ConnectionClosed - error_msg: sent 1011 (unexpected error) keepalive ping timeout; no close frame received 2022-08-10 08:17:00,367 | CRITICAL | _create_stream_thread: stream_id=40a0ae1a72eb-8058-a62f-46f2-dcba4082, time=1660112193.3642814 | BinanceWebSocketApiManager.stream_is_crashing(40a0ae1a72eb-8058-a62f-46f2-dcba4082) 2022-08-10 08:17:49,941 | CRITICAL | _create_stream_thread: stream_id=e2ccc73dc223-77ba-fd62-9567-4557fcd8, time=1660112185.3410606 | BinanceWebSocketApiSocket.start_socket(e2ccc73dc223-77ba-fd62-9567-4557fcd8, {'bookTicker'}, ['enjusdt', 'api3usdt', 'zrxusdt', 'peopleusdt', 'lrcusdt', 'c98usdt', 'xmrusdt', 'aaveusdt', 'dogeusdt', 'trbusdt', 'flmusdt', 'arusdt', 'rvnusdt', 'cotiusdt', 'dotusdt', 'tlmusdt', 'neousdt', 'bandusdt', 'antusdt', 'sklusdt', 'crvusdt', 'manausdt', 'rayusdt', 'celousdt', 'srmusdt', 'hntusdt', 'rlcusdt', 'blzusdt', 'jasmyusdt', 'ontusdt', 'reefusdt', 'sushiusdt', 'duskusdt', 'cvcusdt', 'zilusdt', 'flowusdt', 'darusdt', 'egldusdt', 'sxpusdt', 'gtcusdt', 'hotusdt', 'zecusdt', 'axsusdt', 'algousdt', 'filusdt', 'oneusdt', 'batusdt', 'thetausdt', 'bnxusdt', 'renusdt', 'aliceusdt', 'gmtusdt', 'dydxusdt', 'lptusdt', 'nknusdt', 'yfiusdt', 'adausdt', 'belusdt', 'ksmusdt', 'linausdt', 'fttusdt', 'btcusdt', 'ensusdt', 'apeusdt', 'oceanusdt', 'atomusdt', 'ctkusdt', 'sfpusdt', 'trxusdt', 'ankrusdt', 'dashusdt', 'hbarusdt', 'imxusdt', 'dgbusdt', 'ognusdt', 'ctsiusdt', 'woousdt', 'btsusdt', 'klayusdt', 'xlmusdt', 'xemusdt', 'snxusdt', 'icpusdt', 'tomousdt', 'eosusdt', 'iotxusdt', 'audiousdt', 'chrusdt', 'arpausdt', 'bnbusdt', 'ltcusdt', 'stmxusdt', 'nearusdt', 'atausdt', 'icxusdt', 'qtumusdt', 'wavesusdt', 'etcusdt', 'maskusdt', 'balusdt', 'galusdt', 'zenusdt', 'opusdt', 'chzusdt', 'grtusdt', 'kavausdt', 'vetusdt', 'bchusdt', 'dentusdt', 'xrpusdt', 'ftmusdt', 'celrusdt', 'storjusdt', 'iotausdt', 'iostusdt', 'roseusdt', 'kncusdt', 'scusdt', 'unfiusdt', 'galausdt', 'bakeusdt', 'maticusdt', '1inchusdt', 'rsrusdt', 'mkrusdt', 'ethusdt', 'sandusdt', 'litusdt', 'compusdt', 'runeusdt', 'xtzusdt', 'omgusdt', 'solusdt', 'linkusdt', 'mtlusdt', 'uniusdt', 'alphausdt', 'avaxusdt']) - Exception ConnectionClosed - error_msg: sent 1011 (unexpected error) keepalive ping timeout; no close frame received 2022-08-10 08:17:49,942 | CRITICAL | _create_stream_thread: stream_id=e2ccc73dc223-77ba-fd62-9567-4557fcd8, time=1660112185.3410606 | BinanceWebSocketApiManager.stream_is_crashing(e2ccc73dc223-77ba-fd62-9567-4557fcd8)

Processing method? Worked with process_stream_data first, then changed to pop_stream_data_from_stream_buffer() with separate Thread as described in best practices. however it did not seem to have an impact on the error.

Used endpoint? binance.com and binance.com-futures, each with about 140 markets and channel bookTicker.

Issue as described above.

Code `

    self._ws_api_manager_spot = BinanceWebSocketApiManager(exchange="binance.com",
                                                           throw_exception_if_unrepairable=True,
                                                           ping_timeout_default=10)
    self._ws_api_manager_futures = BinanceWebSocketApiManager(exchange="binance.com-futures",
                                                              throw_exception_if_unrepairable=True,
                                                              ping_timeout_default=10)

    spot_channels = {'bookTicker'}
    self._stream_id_spot_bookTicker = self._ws_api_manager_spot.create_stream(spot_channels, markets,
                                                                              api_key=self._binance_config['apiKey'],
                                                                              api_secret=self._binance_config['secret'],
                                                                              output="dict")

    self._stream_id_futures_bookTicker = self._ws_api_manager_futures.create_stream("arr", "!bookTicker",
                                                                                    api_key=self._binance_config['apiKey'],
                                                                                    api_secret=self._binance_config['secret'],
                                                                                    output="dict")

    spot_stream_worker = threading.Thread(target=self._process_stream_ticker_spot, name="SPOT-StreamWorker")
    spot_stream_worker.start()

def _process_stream_ticker_spot(self):
    time.sleep(30)
    while True:
        if self._ws_api_manager_spot.is_manager_stopping():
            LOG.error("_process_stream_ticker_spot(): _ws_api_manager_spot is stopping, thus exiting!")
            exit(0)

        stream_data = self._ws_api_manager_spot.pop_stream_data_from_stream_buffer()
        if stream_data is False:
            time.sleep(0.01)
        else:
            self._process_ticker_spot_msg(stream_data)

`

halesyy commented 2 years ago

I'm getting this a lot, but my natural inclination as-to why was because pop_stream_data_from_stream_buffer isn't up-to-date, and there's some local compute which is causing a pile-up of old stream data. Unsure if this is the case, but I'm trying to fix this issue as well...

halesyy commented 2 years ago

Just to add to this, when I check in on my program after a while and I see it's been rolling these exceptions, what seems to fix it is by holding the terminal (to pause the main program) then releasing it. This seems to cause the stream_is_crashing function to get called, which then re-connects all the streams, and gets everything going again. I feel that this means that the disconnect/reconnect logic isn't working? Unsure.

Further, setting the no. of markets per-stream has helped in stability a lot. If you are doing say 3 streams on a regular internet connection: you should be fine. If you're trying to do a huge 1,000, then maybe you need to split up those streams and put it onto a VPS. Just my experience with getting around this issue.

Also, when you pop from the stack, check if your "data" is False to do any computation. You want to CLEAN OUT your stack frame ASAP so that requests don't pile up, THEN do your compute when you have iterated all the latest market data. This isn't really to do with the crash in question, but it seems to have helped both the crashing and my architectural descisions. Lmk if anyone else has questions with this crash/API.

casper-hansen commented 2 years ago

The real question is why we do not have a handler or state of the manager that says stream_is_crashing is true. If we had that, we could easily recover the lost data.

halesyy commented 2 years ago

@casperbh96 yeah, there's a few things that could be really improved as far as management of the streams goes. I'm not very versed with Python streams, otherwise I'd make a merge/commit.

oliver-zehentleitner commented 2 years ago

This is mostly not a problem of your code, but often of your system (internet, hardware). If the backlog gets to big, the ping/pong cant be received within the valid timewindows: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/discussions/255

I would need full log files to investigate this further.

The real question is why we do not have a handler or state of the manager that says stream_is_crashing is true. If we had that, we could easily recover the lost data.

Oh, we have :)

Just call this and look what you get: https://unicorn-binance-websocket-api.docs.lucit.tech/unicorn_binance_websocket_api.html#unicorn_binance_websocket_api.manager.BinanceWebSocketApiManager.get_stream_info

Or more elegant is activating the stream signals, you can use it by accessing the buffer or by using a callback function: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/wiki/%60stream_signal_buffer%60

casper-hansen commented 2 years ago

This is mostly not a problem of your code, but often of your system (internet, hardware). If the backlog gets to big, the ping/pong cant be received within the valid timewindows: #255

I would need full log files to investigate this further.

The real question is why we do not have a handler or state of the manager that says stream_is_crashing is true. If we had that, we could easily recover the lost data.

Oh, we have :)

Just call this and look what you get: https://unicorn-binance-websocket-api.docs.lucit.tech/unicorn_binance_websocket_api.html#unicorn_binance_websocket_api.manager.BinanceWebSocketApiManager.get_stream_info

Or more elegant is activating the stream signals, you can use it by accessing the buffer or by using a callback function: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/wiki/%60stream_signal_buffer%60

It is indeed very elegant! Is this how you envisioned it to be used?

    def handler(self, manager: BinanceWebSocketApiManager):
        while True:
            if manager.is_manager_stopping():
                return

            signal_data = manager.pop_stream_signal_from_stream_signal_buffer()
            order_data = manager.pop_stream_data_from_stream_buffer()

            if signal_data is False:
                time.sleep(0.01)
            else:
                try:
                    self.handle_signal_data(signal_data)
                except Exception:
                    manager.add_to_stream_signal_buffer(signal_data['type'], self.data_stream, signal_data)

            if order_data is False:
                time.sleep(0.01)
            else:
                try:
                    self.handle_order_data(order_data)
                except Exception:
                    manager.add_to_stream_buffer(order_data)
lukaszbinden commented 1 year ago

Update wrt my post above from Aug 10, 2022: once I switched from VPS to an OS running on bare metal hardware, the described issue "Exception ConnectionClosed - error_msg: sent 1011 (unexpected error) keepalive ping timeout; no close frame received" more or less disappeared, rarely occurs if ever. It works very well now.

oliver-zehentleitner commented 1 year ago

Just to add to this, when I check in on my program after a while and I see it's been rolling these exceptions, what seems to fix it is by holding the terminal (to pause the main program) then releasing it. This seems to cause the stream_is_crashing function to get called, which then re-connects all the streams, and gets everything going again. I feel that this means that the disconnect/reconnect logic isn't working? Unsure.

Further, setting the no. of markets per-stream has helped in stability a lot. If you are doing say 3 streams on a regular internet connection: you should be fine. If you're trying to do a huge 1,000, then maybe you need to split up those streams and put it onto a VPS. Just my experience with getting around this issue.

Also, when you pop from the stack, check if your "data" is False to do any computation. You want to CLEAN OUT your stack frame ASAP so that requests don't pile up, THEN do your compute when you have iterated all the latest market data. This isn't really to do with the crash in question, but it seems to have helped both the crashing and my architectural descisions. Lmk if anyone else has questions with this crash/API.

Sure if you pause the script in the terminal the streams stop working, after waking the script up to life again, the streams start working again and recognize the timeout and reconnects...

oliver-zehentleitner commented 1 year ago

First of all, thanks for sharing this great and useful library! it is nicely designed.

Unfortunately, I am experiencing the same issue as described above. IMO the issue's occurrence seems OS dependent. When started on Ubuntu 20.04, the error "sent 1011 (unexpected error) keepalive ping timeout; no close frame received" occurs within 5 mins for the first time and continues to repeat (tested multiple times). However, when I run the same code on Windows 10 Pro. the error does not show up within 10 minutes (not tested extensively though). Tried with parameter ping_timeout_default=10 and ping_timeout_default=50, but seemed to have no effect.

Version of this library. unicorn-binance-websocket-api 1.41.0 unicorn-fy 0.11.1

Hardware? Virtual private server

Operating System? Ubuntu 20.04 (64 Bit) / Linux 5.4.0-65-generic #73-Ubuntu SMP Mon Jan 18 17:25:17 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

Python version? Python 3.7.11

Logging output 2022-08-10 08:17:00,367 | CRITICAL | _create_stream_thread: stream_id=40a0ae1a72eb-8058-a62f-46f2-dcba4082, time=1660112193.3642814 | BinanceWebSocketApiSocket.start_socket(40a0ae1a72eb-8058-a62f-46f2-dcba4082, ['arr'], ['!bookTicker']) - Exception ConnectionClosed - error_msg: sent 1011 (unexpected error) keepalive ping timeout; no close frame received 2022-08-10 08:17:00,367 | CRITICAL | _create_stream_thread: stream_id=40a0ae1a72eb-8058-a62f-46f2-dcba4082, time=1660112193.3642814 | BinanceWebSocketApiManager.stream_is_crashing(40a0ae1a72eb-8058-a62f-46f2-dcba4082) 2022-08-10 08:17:49,941 | CRITICAL | _create_stream_thread: stream_id=e2ccc73dc223-77ba-fd62-9567-4557fcd8, time=1660112185.3410606 | BinanceWebSocketApiSocket.start_socket(e2ccc73dc223-77ba-fd62-9567-4557fcd8, {'bookTicker'}, ['enjusdt', 'api3usdt', 'zrxusdt', 'peopleusdt', 'lrcusdt', 'c98usdt', 'xmrusdt', 'aaveusdt', 'dogeusdt', 'trbusdt', 'flmusdt', 'arusdt', 'rvnusdt', 'cotiusdt', 'dotusdt', 'tlmusdt', 'neousdt', 'bandusdt', 'antusdt', 'sklusdt', 'crvusdt', 'manausdt', 'rayusdt', 'celousdt', 'srmusdt', 'hntusdt', 'rlcusdt', 'blzusdt', 'jasmyusdt', 'ontusdt', 'reefusdt', 'sushiusdt', 'duskusdt', 'cvcusdt', 'zilusdt', 'flowusdt', 'darusdt', 'egldusdt', 'sxpusdt', 'gtcusdt', 'hotusdt', 'zecusdt', 'axsusdt', 'algousdt', 'filusdt', 'oneusdt', 'batusdt', 'thetausdt', 'bnxusdt', 'renusdt', 'aliceusdt', 'gmtusdt', 'dydxusdt', 'lptusdt', 'nknusdt', 'yfiusdt', 'adausdt', 'belusdt', 'ksmusdt', 'linausdt', 'fttusdt', 'btcusdt', 'ensusdt', 'apeusdt', 'oceanusdt', 'atomusdt', 'ctkusdt', 'sfpusdt', 'trxusdt', 'ankrusdt', 'dashusdt', 'hbarusdt', 'imxusdt', 'dgbusdt', 'ognusdt', 'ctsiusdt', 'woousdt', 'btsusdt', 'klayusdt', 'xlmusdt', 'xemusdt', 'snxusdt', 'icpusdt', 'tomousdt', 'eosusdt', 'iotxusdt', 'audiousdt', 'chrusdt', 'arpausdt', 'bnbusdt', 'ltcusdt', 'stmxusdt', 'nearusdt', 'atausdt', 'icxusdt', 'qtumusdt', 'wavesusdt', 'etcusdt', 'maskusdt', 'balusdt', 'galusdt', 'zenusdt', 'opusdt', 'chzusdt', 'grtusdt', 'kavausdt', 'vetusdt', 'bchusdt', 'dentusdt', 'xrpusdt', 'ftmusdt', 'celrusdt', 'storjusdt', 'iotausdt', 'iostusdt', 'roseusdt', 'kncusdt', 'scusdt', 'unfiusdt', 'galausdt', 'bakeusdt', 'maticusdt', '1inchusdt', 'rsrusdt', 'mkrusdt', 'ethusdt', 'sandusdt', 'litusdt', 'compusdt', 'runeusdt', 'xtzusdt', 'omgusdt', 'solusdt', 'linkusdt', 'mtlusdt', 'uniusdt', 'alphausdt', 'avaxusdt']) - Exception ConnectionClosed - error_msg: sent 1011 (unexpected error) keepalive ping timeout; no close frame received 2022-08-10 08:17:49,942 | CRITICAL | _create_stream_thread: stream_id=e2ccc73dc223-77ba-fd62-9567-4557fcd8, time=1660112185.3410606 | BinanceWebSocketApiManager.stream_is_crashing(e2ccc73dc223-77ba-fd62-9567-4557fcd8)

Processing method? Worked with process_stream_data first, then changed to pop_stream_data_from_stream_buffer() with separate Thread as described in best practices. however it did not seem to have an impact on the error.

Used endpoint? binance.com and binance.com-futures, each with about 140 markets and channel bookTicker.

Issue as described above.

Code `

    self._ws_api_manager_spot = BinanceWebSocketApiManager(exchange="binance.com",
                                                           throw_exception_if_unrepairable=True,
                                                           ping_timeout_default=10)
    self._ws_api_manager_futures = BinanceWebSocketApiManager(exchange="binance.com-futures",
                                                              throw_exception_if_unrepairable=True,
                                                              ping_timeout_default=10)

    spot_channels = {'bookTicker'}
    self._stream_id_spot_bookTicker = self._ws_api_manager_spot.create_stream(spot_channels, markets,
                                                                              api_key=self._binance_config['apiKey'],
                                                                              api_secret=self._binance_config['secret'],
                                                                              output="dict")

    self._stream_id_futures_bookTicker = self._ws_api_manager_futures.create_stream("arr", "!bookTicker",
                                                                                    api_key=self._binance_config['apiKey'],
                                                                                    api_secret=self._binance_config['secret'],
                                                                                    output="dict")

    spot_stream_worker = threading.Thread(target=self._process_stream_ticker_spot, name="SPOT-StreamWorker")
    spot_stream_worker.start()

def _process_stream_ticker_spot(self):
    time.sleep(30)
    while True:
        if self._ws_api_manager_spot.is_manager_stopping():
            LOG.error("_process_stream_ticker_spot(): _ws_api_manager_spot is stopping, thus exiting!")
            exit(0)

        stream_data = self._ws_api_manager_spot.pop_stream_data_from_stream_buffer()
        if stream_data is False:
            time.sleep(0.01)
        else:
            self._process_ticker_spot_msg(stream_data)

`

I bet (not sure) this is not OS dependent, the both systems have different power, i guess the win pc has more power and the backlog of the streams starts later to become a problem.

oliver-zehentleitner commented 1 year ago

A good advice, look with htop or an other taskmanager how much system power is used.

In Telegram we get a lot of similar requests and it has always been an overloaded CPU... Read this for further information: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/discussions/255