LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
678 stars 166 forks source link

Memory leak #205

Closed dima-dmytruk23 closed 2 years ago

dima-dmytruk23 commented 2 years ago

Hi. We are using Python3.8, websocket-client==1.1.1, websockets==9.1, unicorn-binance-websocket-api==1.31.0

We receive about 600 data objects every 250 ms.

We are constantly faced with the problem of memory leaks.

The specifics of our service: Every 50 minutes we close the old websocket and open a new websocket. Maybe we are doing it incorrectly?

    def toggle_websockets(self, last_time):
        if not self.websockets1:
            for markets in markets_list:
                twm = BinanceWebSocketApiManager(
                    exchange="binance.com-futures",
                    warn_on_update=False,
                )
                stream = twm.create_stream(
                    tuple(channels), tuple(markets)
                )
                self.websockets1.append((twm, stream))

            for ws in self.websockets2:
                ws[0].stop_manager_with_all_streams()
            self.websockets2 = []

        else:
            for markets in markets_list:
                twm = BinanceWebSocketApiManager(
                    exchange="binance.com-futures",
                    warn_on_update=False,
                )
                stream = twm.create_stream(
                    tuple(channels), tuple(markets)
                )
                self.websockets2.append((twm, stream))

            for ws in self.websockets1:
                ws[0].stop_manager_with_all_streams()
            self.websockets1 = []

tracemalloc show to this.

/usr/local/lib/python3.8/site-packages/websockets/legacy/protocol.py", line 834
    return frame.data.decode("utf-8") if text else frame.data
oliver-zehentleitner commented 2 years ago

do you have more info on the memory leak?

I would build your function differently:

  1. keep the manager instance alive.
  2. don't stop all streams but use replace_stream(), this starts a new stream and closes the old one only when the new one has received the first record.
oliver-zehentleitner commented 2 years ago

Are you creating one stream for each market symbol?

dima-dmytruk23 commented 2 years ago

I would build your function differently:

Thnx. I will try it.

Are you creating one stream for each market symbol?

No. I'm set many symbol for one stream and create new stream if length of interval & pair combination greater than 200.

MAXIMUM_WS_CONNECTIONS = 200

def calculate_channels_number(intervals_count: int) -> int:
    greatest_multiple = max(
        range(intervals_count, MAXIMUM_WS_CONNECTIONS + 1, intervals_count)
    )
    max_pairs_per_ws = greatest_multiple // intervals_count
    return max_pairs_per_ws

....

    @staticmethod
    def get_channels_and_markets(last_time: dict):
        intervals_count = Interval.objects.count()
        pairs_per_ws = calculate_channels_number(intervals_count)
        client = None
        channels = set()
        markets = []
        ind = 1
        markets_set = set()
        for pair in Pair.objects.filter(is_active=True, is_deleted=False):
            for interval in Interval.objects.all():
                channels.add(f"kline_{interval.alias}")
            markets_set.add(pair.ticker.lower())
            if not ind % pairs_per_ws:
                markets.append(markets_set)
                markets_set = set()
            ind += 1
        if markets_set:
            markets.append(markets_set)
        return channels, markets

...

        for markets in markets_list:
            twm = BinanceWebSocketApiManager(
                exchange="binance.com-futures",
                warn_on_update=False,
            )
            stream = twm.create_stream(
                tuple(channels), tuple(markets)
            )
            self.websockets1.append((twm, stream))

        for ws in self.websockets2:
            ws[0].stop_manager_with_all_streams()
        self.websockets2 = []
dima-dmytruk23 commented 2 years ago

@oliver-zehentleitner for this channels and markets I've four streams

channels: {'kline_1w', 'kline_1M', 'kline_4h', 'kline_1h', 'kline_1d'}
markets: [{'iotausdt', 'enjusdt', 'crvusdt', 'iostusdt', 'sushiusdt', 'mkrusdt', 'bzrxusdt', 'dgbusdt', 'trbusdt', 'rsrusdt', 'btcusdt', 'balusdt', 'ankrusdt', 'atomusdt', 'xrpusdt', 'egldusdt', 'lunausdt', 'avaxusdt', 'eosusdt', 'dogebusd', 'zenusdt', 'zrxusdt', 'bchusdt', 'ftmusdt', 'ctkusdt', 'etcusdt', 'defiusdt', 'ltcusdt', 'xtzusdt', 'unfiusdt', 'dotusdt', 'alphausdt', 'rayusdt', 'tlmusdt', 'gtcusdt', 'compusdt', 'aliceusdt', 'ethusdt', 'scusdt', 'xemusdt'}, {'batusdt', 'renusdt', 'zilusdt', 'flmusdt', 'mtlusdt', 'nknusdt', 'rvnusdt', 'solusdt', 'linkusdt', 'snxusdt', 'xlmusdt', 'rlcusdt', 'srmusdt', 'icxusdt', 'bakeusdt', 'cvcusdt', 'akrousdt', 'ethbusd', 'runeusdt', 'reefusdt', 'sfpusdt', 'ksmusdt', 'wavesusdt', 'oneusdt', 'adausdt', 'sxpusdt', 'iotxusdt', 'kncusdt', 'belusdt', 'oceanusdt', 'hbarusdt', 'blzusdt', 'bandusdt', 'ontusdt', 'audiousdt', 'ognusdt', 'chzusdt', 'dentusdt', 'dogeusdt', 'filusdt'}, {'vetusdt', 'omgusdt', 'hotusdt', 'yfiusdt', 'adabusd', 'cotiusdt', 'kavausdt', 'lrcusdt', 'litusdt', 'aaveusdt', 'bttusdt', 'btcdomusdt', 'celrusdt', 'dashusdt', 'icpusdt', 'keepusdt', 'storjusdt', 'btsusdt', 'maticusdt', 'manausdt', 'btcbusd', 'axsusdt', 'sklusdt', 'chrusdt', 'c98usdt', 'yfiiusdt', 'thetausdt', 'bnbbusd', 'hntusdt', 'trxusdt', 'nearusdt', 'sandusdt', 'linausdt', 'algousdt', 'stmxusdt', 'zecusdt', 'qtumusdt', 'neousdt', 'maskusdt', 'uniusdt'}, {'xmrusdt', '1000shibusdt', 'dodousdt', 'grtusdt', 'xrpbusd', 'dydxusdt', 'bnbusdt', 'fttbusd', 'solbusd', 'tomousdt', '1inchusdt'}]

Also, I tried this

  1. keep the manager instance alive.
  2. don't stop all streams but use replace_stream(), this starts a new stream and closes the old one only when the new one has received the first record.

But it's not help me.

I used tracemalloc to trace the memory leak. I saw the following picture.

At the start of the process:

[ Top 10 memory usage objects ]
/src/datareader/management/commands/get_data_from_websocket.py:67: size=1656 B, count=5, average=331 B
<frozen importlib._bootstrap>:979: size=648 B, count=2, average=324 B
/src/datareader/management/commands/get_data_from_websocket.py:32: size=576 B, count=1, average=576 B
/usr/local/lib/python3.8/site-packages/django/core/management/__init__.py:40: size=520 B, count=2, average=260 B
/usr/local/lib/python3.8/site-packages/django/core/management/base.py:133: size=272 B, count=2, average=136 B
/src/datareader/management/commands/get_data_from_websocket.py:281: size=184 B, count=2, average=92 B
/src/datareader/management/commands/get_data_from_websocket.py:85: size=184 B, count=2, average=92 B
/src/datareader/management/commands/get_data_from_websocket.py:308: size=136 B, count=1, average=136 B
/src/datareader/management/commands/get_data_from_websocket.py:246: size=136 B, count=1, average=136 B
/src/datareader/management/commands/get_data_from_websocket.py:232: size=136 B, count=1, average=136 B
5 memory blocks: 1.6 KiB
  File "/src/datareader/management/commands/get_data_from_websocket.py", line 67
    class Command(BaseCommand):

After an hour of work:

[ Top 10 memory usage objects ]
/usr/local/lib/python3.8/site-packages/websockets/legacy/protocol.py:834: size=558 MiB, count=1538074, average=381 B
/usr/local/lib/python3.8/site-packages/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py:783: size=12.8 MiB, count=4, average=3275 KiB
/usr/local/lib/python3.8/site-packages/websockets/extensions/permessage_deflate.py:74: size=2097 KiB, count=64, average=32.8 KiB
<frozen importlib._bootstrap_external>:640: size=236 KiB, count=2342, average=103 B
/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py:1336: size=172 KiB, count=1002, average=176 B
/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py:1340: size=172 KiB, count=1000, average=176 B
/usr/local/lib/python3.8/site-packages/websockets/extensions/permessage_deflate.py:133: size=160 KiB, count=5, average=32.0 KiB
/usr/local/lib/python3.8/abc.py:102: size=135 KiB, count=1301, average=106 B
/usr/local/lib/python3.8/site-packages/django/utils/functional.py:48: size=124 KiB, count=89, average=1429 B
/usr/local/lib/python3.8/site-packages/sentry_sdk/tracing.py:722: size=100 KiB, count=198, average=518 B
1538074 memory blocks: 571569.0 KiB
  File "/usr/local/lib/python3.8/site-packages/websockets/legacy/protocol.py", line 834
    return frame.data.decode("utf-8") if text else frame.data

After 2 hours of operation:

[ Top 10 memory usage objects ]
/usr/local/lib/python3.8/site-packages/websockets/legacy/protocol.py:834: size=1125 MiB, count=3097554, average=381 B
/usr/local/lib/python3.8/site-packages/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py:783: size=26.0 MiB, count=4, average=6652 KiB
/usr/local/lib/python3.8/site-packages/websockets/extensions/permessage_deflate.py:74: size=3145 KiB, count=90, average=34.9 KiB
/usr/local/lib/python3.8/site-packages/websockets/extensions/permessage_deflate.py:133: size=288 KiB, count=9, average=32.0 KiB
<frozen importlib._bootstrap_external>:640: size=236 KiB, count=2342, average=103 B
/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py:1336: size=172 KiB, count=1001, average=176 B
/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py:1340: size=172 KiB, count=1000, average=176 B
/usr/local/lib/python3.8/site-packages/sentry_sdk/tracing.py:722: size=146 KiB, count=296, average=507 B
/usr/local/lib/python3.8/linecache.py:137: size=146 KiB, count=1423, average=105 B
/usr/local/lib/python3.8/abc.py:102: size=135 KiB, count=1301, average=106 B
3097554 memory blocks: 1151652.4 KiB
  File "/usr/local/lib/python3.8/site-packages/websockets/legacy/protocol.py", line 834
    return frame.data.decode("utf-8") if text else frame.data

It can be seen that the leak occurs in websockets / legacy / protocol.py

Memory usage graph

image

Any ideas, suggestions, why this is happening?

oliver-zehentleitner commented 2 years ago

Why did you close it?

dima-dmytruk23 commented 2 years ago

@oliver-zehentleitner There was a bug in my implementation. In each iteration, I created a new manager and a separate stream to it. As a result, the stream buffer filled up (only the last stream was cleared). The memory leak issue is almost over. There is a small accumulation (in the legacy protocol on the websockets and self.encoder = zlib.compressobj( in websockets/extensions/permessage_deflate.py), of course, but now it is not critical.