LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
677 stars 166 forks source link

Memory buildup on websocket activation/deactivation #273

Closed hawkeye-bot closed 2 years ago

hawkeye-bot commented 2 years ago

Version of this library.

unicorn_fy: 0.12.2 unicorn_binance_local_depth_cache: not found unicorn_binance_rest_api: 1.4.3 unicorn_binance_trailing_stop_loss: not found unicorn_binance_websocket_api: 1.40.7

Solution to Issue cannot be found in the documentation or other Issues and also occurs in the latest version of this library.

Hardware?

VPS or other cloud hosting

Operating System?

Linux

Python version?

Python3.9

Installed packages

Cython==0.29.28
unicorn-binance-websocket-api==1.40.7
unicorn-binance-rest-api==1.4.3
unicorn-fy==0.12.2
pandas==1.4.2
hjson==3.0.2
SQLAlchemy==1.4.35
numpy==1.22.3
flake8==4.0.1
oandapyV20==0.7.2
jsons==1.6.1
pytest-bdd==5.0.0
bbd-table-parser==0.0.1
ccxt==1.78.43
pybit==2.1.0
pyyaml==6.0
GitPython==3.1.27
pid==3.0.4
mplfinance==0.12.8b9
kneed==0.7.0
scikit-learn==1.0.2
# UI
urwid==2.1.2
panwid==0.3.5
psutil==5.9.0

Logging output

No response

Processing method?

stream_buffer

Used endpoint?

binance.com-futures

Issue

My software dynamically connects to different symbols over time. As such, I want to close the web sockets properly when no longer interested in a specific symbol. I've noticed over time the memory starts to add up, and pinpointed it to (perhaps my incorrect usages of) the unicorn library. I've set up a small sample script that showcases what's going wrong. As I said, perhaps it's because I'm incorrectly using the library; if so, it would be very much appreciated to show a working sample! unicorn_memleak.py.txt

oliver-zehentleitner commented 2 years ago

I think that a var is deleted does not mean that the memory is freed immediately too! (-> Google: python garbage collector).

try gc.collect(), never used it, but it could help in your script: https://docs.python.org/3/library/gc.html#gc.collect

ws_manager.kill_stream(stream_id) does not need to be used! just creating and stopping is enough, the rest is done by the manager.

i would prefer subscribing 200 different markets to one stream instead of one stream for each market symbol. this is an overhead. just subscribe and unsubscribe (is also faster then creating a new stream and is in most cases also faster in processing).

What you are doing with while data_received is False:-block can be done with if ws_manager.wait_till_stream_has_started(stream_id):.

https://unicorn-binance-websocket-api.docs.lucit.tech/unicorn_binance_websocket_api.html?highlight=wait_till_stream_has_started#unicorn_binance_websocket_api.manager.BinanceWebSocketApiManager.wait_till_stream_has_started

hawkeye-bot commented 2 years ago

Thanks for the reply @oliver-zehentleitner ; using a single stream and using (un)subscribe I can get it working without running into memory issues. The main reason for using streams like this instead of using the earlier (un)subscribe was that it's much hard to process symbol-specific events over a single stream. For example if I'm only interested in the latest trade tick, I don't know how to make sure the stream only has the last tick for each symbol, instead of piling up different ticks of different symbols? Also, LIFO doesn't help me that much in my situation because it will just pour events onto a single stream to pop from.

I can make it work, but need to work around a number of things from my own codebase in order to make it processable the way I want/need.

oliver-zehentleitner commented 2 years ago

you can manage to split the stream_buffer in your own symbol specific buffers with a with lines of code. and endup in the same interface.

create buffer[symbol] = deque() - then you have a pipe/stack for each symbol. then pop everything from the original stream_buffer and and write it to the symbol specific buffer. then you have the same thing what a stream_buffer per symbol-stream was. just pop from the new symbol specific buffer as you did before...

hawkeye-bot commented 2 years ago

you can manage to split the stream_buffer in your own symbol specific buffers with a with lines of code. and endup in the same interface.

create buffer[symbol] = deque() - then you have a pipe/stack for each symbol. then pop everything from the original stream_buffer and and write it to the symbol specific buffer. then you have the same thing what a stream_buffer per symbol-stream was. just pop from the new symbol specific buffer as you did before...

yes I've implemented something like this in my code now, works fine. Another thing I noticed that when I subscribe a new market to a stream, it takes a while before it picks it up, and same for unsubscribing? Is there anything specific to do to get the newly subscribed market activated IMMEDIATELY when it's subscribed? Currently it can take like 10 seconds before the socket is identified as crashing and is then successfully restarted?

Thanks so much for the answers so far by the way, apologies for my lack of knowledge on the library probably!

oliver-zehentleitner commented 2 years ago

hm. this depends from your receives, everytime your receive an item it looks for a new (un)subscribe payload and sends it. if you receive only once each 10 sencods...

but there is a wait_for() asyncio function, we can use it for timeouts and can make it independet from the receiving time, but it slows down receiving speed about 50% so i only activate it for userdata streams automatically. we can also implement it for normal streams with an opt-in switch...

for recognizing a broken stream play with ping_interval=1, ping_timeout=3 and close_timeout=1. the docu explains this!

So i close this and if you have other questions please open a new issue!

best regards, Oliver