LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
677 stars 166 forks source link

How to properly delete a stream completely from the manager? #307

Closed oliver-zehentleitner closed 3 months ago

oliver-zehentleitner commented 1 year ago

Discussed in https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/discussions/279

Originally posted by **Scoooooba** June 8, 2022 Hi Guys, given the following simple code: ``` from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager websocket_manager = BinanceWebSocketApiManager() stream_id = websocket_manager.create_stream('depth', 'BNBBUSD') ``` When it is executed, entries for this particular stream_id are made at least in the following dictionaries (maybe in more, these were the ones I've quickly been able to find): ``` websocket_manager.specific_process_stream_data websocket_manager.stream_threading_lock websocket_manager.stream_list websocket_manager.event_loops websocket_manager.socket_is_ready websocket_manager.stream_threads websocket_manager.stream_threading_lock websocket_manager.websocket_list ``` After stopping the stream with the `websocket_manager.stop_stream(stream_id)` command, the entries for this stream_id in the different dictionaries are still existing. There is a function `websocket_manager.delete_stream_from_stream_list(stream_id)` to remove the stream_id entry from the `websocket_manager.stream_list` dicionary, but as far as I know there are no functions to delete the stream_id from all the other dictionaries. Wouldn't it make sense to implement (a) function(s) to remove a stream_id completely from the `websocket_manager`, so the manager can be properly cleaned up? Otherwise these dictionaries will grow a lot, if several streams will be created and stopped over some time. Best regards Sebastian
Scoooooba commented 3 months ago

Hi Oliver, thanks for proving an update for this issue. I tested your minimal code example:

from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager
import time

def callback_func(stream_data):
    print(stream_data)

ubwa = BinanceWebSocketApiManager(debug=True)
ubwa.print_summary()
stream_id = ubwa.create_stream('depth20@1000ms', 'BTCUSDT', output='dict', process_stream_data=callback_func)
time.sleep(5)
stopped = ubwa.stop_stream(stream_id)
ubwa.wait_till_stream_has_stopped(stream_id)
ubwa.print_summary()
deleted = ubwa.delete_stream_from_stream_list(stream_id)
ubwa.stop_manager_with_all_streams()

print(f"ubwa.specific_process_stream_data: {ubwa.specific_process_stream_data}")
print(f"ubwa.stream_threading_lock:        {ubwa.stream_threading_lock}")
print(f"ubwa.stream_list:                  {ubwa.stream_list}")
print(f"ubwa.event_loops:                  {ubwa.event_loops}")
print(f"ubwa.socket_is_ready:              {ubwa.socket_is_ready}")
print(f"ubwa.stream_threads:               {ubwa.stream_threads}")
print(f"ubwa.stream_threading_lock:        {ubwa.stream_threading_lock}")
print(f"ubwa.websocket_list:               {ubwa.websocket_list}")

This prints:

ubwa.specific_process_stream_data: {'f74947a228c4-9cb4-b278-b192-0821139f': <function callback_func at 0x0000022B71AFA160>}
ubwa.stream_threading_lock:        {'f74947a228c4-9cb4-b278-b192-0821139f': {'full_lock': <unlocked _thread.lock object at 0x0000022B74344B80>, 'receives_statistic_last_second_lock': <unlocked _thread.lock object at 0x0000022B741BB880>}}
ubwa.stream_list:                  {}
ubwa.event_loops:                  {'f74947a228c4-9cb4-b278-b192-0821139f': <_WindowsSelectorEventLoop running=False closed=True debug=False>}
ubwa.socket_is_ready:              {'f74947a228c4-9cb4-b278-b192-0821139f': True}
ubwa.stream_threads:               {'f74947a228c4-9cb4-b278-b192-0821139f': <Thread(_create_stream_thread: stream_id=f74947a228c4-9cb4-b278-b192-0821139f, time=1710263747.0250547, stopped 4676)>}
ubwa.stream_threading_lock:        {'f74947a228c4-9cb4-b278-b192-0821139f': {'full_lock': <unlocked _thread.lock object at 0x0000022B74344B80>, 'receives_statistic_last_second_lock': <unlocked _thread.lock object at 0x0000022B741BB880>}}
ubwa.websocket_list:               {'f74947a228c4-9cb4-b278-b192-0821139f': <websockets.legacy.client.WebSocketClientProtocol object at 0x0000022B73E5FB90>}

So it seems like there is still some remaining data even after removing the stream. I can still see some potential issues here, because these dictionaries will grow a lot, if several streams will be created and stopped over some time.

I need to mention that I tested it with version 1.46.2, so I don't know if you already reworked it in newer versions. I'm using your library in only a small hobby project of mine that I'm currently not willing to spent any money on, as long as I'm not very certain that it will give me some profits ;)

oliver-zehentleitner commented 3 months ago

Hello!

wait_till_stream_has_stopped() is fixed and the current versions offer a number of improvements.

I am currently preparing a new release which will be published tomorrow. I can implement this point that we discussed in "Discussions" (https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/discussions/279#discussioncomment-2914251), that should still work out.

oliver-zehentleitner commented 3 months ago

Its ready and released :)

https://pypi.org/project/unicorn-binance-websocket-api/2.2.0/

Set auto_data_cleanup_stopped_streams to True and there will be no more remaining data!

ubwa = BinanceWebSocketApiManager(auto_data_cleanup_stopped_streams=True)
Scoooooba commented 3 months ago

Hi Oliver, looking at your source code the remove_all_data_of_stream_id seems to do the job!

However, I stumbled over the _auto_data_cleanup_stopped_streams function:

    def _auto_data_cleanup_stopped_streams(self, interval=60):
        timestamp_last_check = 0
        while self.is_manager_stopping() is False:
            logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Starting with an interval "
                        f"of {interval} seconds!")
            if self.get_timestamp_unix() > timestamp_last_check + interval:
                timestamp_last_check = self.get_timestamp_unix()
                if self.auto_data_cleanup_stopped_streams is True:
                    stopped_streams = []
                    for stream_id in self.stream_list:
                        stopped_streams.append(stream_id)
                    for stream_id in stopped_streams:
                        try:
                            restart_status = self.restart_requests[stream_id].get("status")
                        except KeyError:
                            restart_status = None
                        if self.stream_list[stream_id]['status'] == "stopped" and restart_status != "new":
                            logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Removing "
                                        f"all remaining data of stream with stream_id={stream_id} from this instance!")
                            self.remove_all_data_of_stream_id(stream_id=stream_id)
                            logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Remaining "
                                        f"data of stream with stream_id={stream_id} successfully removed from this "
                                        f"instance!")
            time.sleep(1)

Isn't the logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Starting with an interval " f"of {interval} seconds!") command at the wrong line? Shouldn't it be before the while statement? Otherwise I guess that this log entry will be written/displayed every second.

oliver-zehentleitner commented 3 months ago

Right, thank you! I've already fixed it. There will be a new release in a few hours.