LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
685 stars 165 forks source link

add support to run streams in a subprocess #126

Closed oliver-zehentleitner closed 4 months ago

oliver-zehentleitner commented 3 years ago

Is your feature request related to a problem? Please describe. This lib is using multi-threading which is not really parallel in python - through the GIL all threads are executed sequential in a cycle.

Describe the solution you'd like wrap streams into processes instead of threads to bypass GIL

kimchirichie commented 3 years ago

if the streams become subprocesses, would that not hinder the main process ability to terminate the stream? how would you kill the stream?

oliver-zehentleitner commented 3 years ago

I need to test it out, I have no experience with that, but I have read some articles and when creating a new process, i think you can get the PID as result and it should be easy to kill/stop a process. And the processes can have a pipe to each other with multiprocessing.Queue().

https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

kimchirichie commented 3 years ago

interesting. i wonder if benefits of multiprocessing is significant enough. i think parallel processing is mostly useful for computational scripts. in our case, its I/O network.

oliver-zehentleitner commented 3 years ago

if you do it not parallelo with multiprocessing than you are limited to one cpu core! even with 8 cores, your script can use only 12,5% of the cpu power of the system.

kimchirichie commented 3 years ago

youre 100% right. but does it need more than 1 core? its network I/O and most of the time, the process is not working. threading may be sufficient. (unless theres hundreds of connections)

oliver-zehentleitner commented 3 years ago

Try this and its just throwing away the receives.... not saving it to database or something else...

I think async.io is very cpu intensive.

oliver-zehentleitner commented 3 years ago

But you are right, its more a theoretical problem, 99,99% of all users will be not affected. Its very uncommon to stream everything, this does not make sense excepted for testing this lib :) - but if I find have the time, I think I will try it.

kosmodromov commented 3 years ago

Actually it makes sense because sometimes in periods of high volatility, network traffic increases significantly along with cpu that also needs for app logic. In case of streaming multiple timeframes on binance futures, for instance.

Indiana3714 commented 2 years ago

As far as I know, due to the GIL, there's actually only one thread actually running in Python even when you're multithreaded. (https://hackernoon.com/concurrent-programming-in-python-is-not-what-you-think-it-is-b6439c3f3e6a and https://tenthousandmeters.com/blog/python-behind-the-scenes-13-the-gil-and-its-effects-on-python-multithreading/) Unlike in other languages that have true multithreading unfortunately. Sadly right now its pretty hard to use the library to subscribe to orderbook updates every 500ms for example when "depth5" when you have like 30 subscriptions to symbols, the data that is received from the library lags by a minimum of 10 seconds or so (this figure goes up forever). Still thinking of how to handle it on my project, maybe I'll spawn one subprocess and one manager per channel and pool the messages using PyZMQ (since that uses Cython internally and should be fast and beats built in multiprocessing pipe or queue libraries)

CharlyEmpereurmot commented 2 years ago

Hi!

Indeed when starting to stream whatever, it would be absolutely great to be able to choose between Threads or Processes. In the meanwhile, for going around this and to use Processes, maybe something in this spirit would work ?

For some reasons right now stream_payload and stream_signal are always False. While the same code without using multiprocessing worked fine. Do you know what the origin of this problem is? Does the test code below makes sense to you, or is there a fundamental flaw I'm not seeing?

Is it even OK to try to pass the BinanceWebSocketApiManager objects as arguments like this?

from multiprocessing import Process, Manager
import unicorn_binance_websocket_api
from datetime import datetime
import pytz, time, json

def handle_stream(bwsam, data, streams_uuids):

    while True:

        if bwsam.is_manager_stopping():
            exit(0)
        stream_payload = bwsam.pop_stream_data_from_stream_buffer()
        stream_signal = bwsam.pop_stream_signal_from_stream_signal_buffer()

        # process signal from websocket
        if stream_signal is not False:
            pair, period = streams_uuids[stream_signal['stream_id']]

            if stream_signal['type'] == 'CONNECT':
                if not data[pair]['stream_connected']:
                    data[pair]['stream_connected'] = True
                    print(f"{pair.upper()}_{period} -- {datetime.now(tz=pytz.utc).timestamp()} -- WARNING: Stream connected")

            elif stream_signal['type'] == 'DISCONNECT':
                if data[pair]['stream_connected']:
                    data[pair]['stream_connected'] = False
                    print(f"{pair.upper()}_{period} -- {datetime.now(tz=pytz.utc).timestamp()} -- WARNING: Stream disconnected")

        # process payload from websocket
        if stream_payload is False:
            time.sleep(0.1)
        else:
            stream_payload = json.loads(stream_payload)
            if 'stream' not in stream_payload:
                continue
            pair, datatype = stream_payload['stream'].split('@')
            period = datatype.split('_')[-1]

            # stream data
            open_ts = stream_payload['data']['k']['t']
            close_price = stream_payload['data']['k']['c']

            print(f"{pair.upper()}_{period} -- {datetime.now(tz=pytz.utc).timestamp()} @ {open_ts} -- Price: {close_price}")

if __name__ == "__main__":

    mp_manager = Manager()
    data = mp_manager.dict()
    streams_uuids = mp_manager.dict()

    bwsam_1 = unicorn_binance_websocket_api.BinanceWebSocketApiManager(exchange="binance.com", enable_stream_signal_buffer=True)
    stream_uuid_1 = bwsam_1.create_stream(channels=['kline_1m'], markets=['btcusdt'])
    data['btcusdt'] = {
        'period': '1m',
        'stream_connected': False,
        'stream_uuid': stream_uuid_1
    }
    streams_uuids[stream_uuid_1] = 'btcusdt', '1m'

    bwsam_2 = unicorn_binance_websocket_api.BinanceWebSocketApiManager(exchange="binance.com", enable_stream_signal_buffer=True)
    stream_uuid_2 = bwsam_2.create_stream(channels=['kline_5m'], markets=['ethusdt'])
    data['ethusdt'] = {
        'period': '1m',
        'stream_connected': False,
        'stream_uuid': stream_uuid_2
    }
    streams_uuids[stream_uuid_2] = 'ethusdt', '1m'

    p1 = Process(target=handle_stream, args=(bwsam_1, data, streams_uuids))
    p2 = Process(target=handle_stream, args=(bwsam_2, data, streams_uuids))
    p1.start()
    p2.start()
    print('Processes start')
oliver-zehentleitner commented 2 years ago

The problem with subproccesses (and in your script) is that no memory objects can be shared.

Example: you start a script (Process 1) and in this process you initiate an object. then you create a new process (Process 2) and pass the initiated object - this object will be copied and is not a reference. Process 1 and 2 use different objects! I just wanted to prove this by a source and found this: https://docs.python.org/3/library/multiprocessing.shared_memory.html Apparently this is new since Python 3.8: https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/

I'm in a bit of a hurry right now and I'm not quite sure if this is really a solution, but I think this is the direction it should go.

oliver-zehentleitner commented 4 months ago

No longer fits directly into the concept - we work with AsyncIO and Kubernetes and no longer need sub-processes.

If someone really urgently needs it, please contact us: https://www.lucit.tech/get-support.html