LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
684 stars 165 forks source link

Finding and Editing Streams Globally #158

Closed ArnoldWolfstein closed 3 years ago

ArnoldWolfstein commented 3 years ago

Hi, First of all, thanks a lot for this great work.

I'd like to ask: how can I access streams globally? In the same os/host/container etc.

e.g: Let's assume I created a user stream for UserA and UserB labeled all user streams. (While these threads and buffer running in the background) now I want to add/subscribe UserC into stream all user streams via initiating a new BinanceWebSocketApiManager. How can I achieve such a task?

Also same question for delete/unsubscribe: finding and removing a user from labeled stream globally?

oliver-zehentleitner commented 3 years ago

Hi! Is there any reason why you run a separate instance of BinanceWebSocketApiManager? If no, then just start all three user data streams in one instance:

userA_stream_id = ubwa.create_stream('arr', '!userData', api_key=api_key, api_secret=api_secret)
userB_stream_id = ubwa.create_stream('arr', '!userData', api_key=api_key, api_secret=api_secret)
userC_stream_id = ubwa.create_stream('arr', '!userData', api_key=api_key, api_secret=api_secret)
ArnoldWolfstein commented 3 years ago

Yeah, the main case for separate instance:

So the restriction behind this is; we don't don't know yet the api_key/api_secret of UserC. So we can't create userC_stream_id in the same instance of the manager.

P.S: Maybe there are another solutions for my use case but I couldn't see any other way.

oliver-zehentleitner commented 3 years ago

you dont need to know that at the beginning, you can use ubwa.create_stream() any time you want ...

ArnoldWolfstein commented 3 years ago

Hmm. Let me give you my dry example after reconsidering my implementation. -Thank you a lot for such a quick response in the meantime. Will be back asap.

ArnoldWolfstein commented 3 years ago

OK, tried again but couldn't figure the solution out. I think I'm so single threaded :)

Let's make things simple get the main things.This is not useful and secure way but gives high level understanding.

We got 2 modules:

src/main.py: After we register and save user to db would like to start a user socket for this user.

...

@app.route('/register', methods =['GET', 'POST'])
def register():
    msg = ''
    if request.method == 'POST' and 'email' in request.form and 'password' in request.form:
        # Make some user registration checks: user exits, email valid etc.
        if check_user():
            save_user_to_db()
            save_user_api_keys()

            # Where we implement `create_user_stream`from ubwa
            # ----- HERE ------

            msg = 'You have successfully registered !'

...

if __name__ == "__main__":
    port = int(os.environ.get("PORT", 5000))
    app.run(host="127.0.0.1", port=port)

src/register_user_websocket.py

...

# create instances of BinanceWebSocketApiManager
ubwa = BinanceWebSocketApiManager(exchange="binance.com")

def get_existing_users() -> list:
    """ Get users from db """
    return some_db_query_for_getting_user()

def get_user_api_keys(user_id: int) -> Tuple[str, str]:
    """ Get user api keys from db """
    return some_db_query_for_keys(user_id)

existing_user_list = get_users()

# create user streams for existing users
user_stream_id = ""
for user in existing_user_list:
    api_key, api_secret = get_user_api_keys(user_id=user.id)
    user_stream_id = f"user_stream_{user.id}"
    user_stream_id = ubwa.create_stream("arr", "!userData", api_key=api_key, api_secret=api_secret)

# start a worker process to move the received stream_data from the stream_buffer to a print function
worker_thread = threading.Thread(target=print_stream_data_from_stream_buffer, args=(ubwa,))
worker_thread.start()

I started register_user_websocket in a background process or as a system daemon, then started main module. New user hit the register page. So what would be the way to create new users stream?

oliver-zehentleitner commented 3 years ago

Make in src/register_user_websocket.py a while True-loop, read from the database if there are new entries you have to start a user stream for, check if there is a stream running for the user and if not start one for them.

import time

def get_existing_users() -> list:
    """ Get users from db """
    return some_db_query_for_getting_user()

def get_user_api_keys(user_id: int) -> Tuple[str, str]:
    """ Get user api keys from db """
    return some_db_query_for_keys(user_id)

# create instances of BinanceWebSocketApiManager
ubwa = BinanceWebSocketApiManager(exchange="binance.com")

# start a worker process to move the received stream_data from the stream_buffer to a print function
worker_thread = threading.Thread(target=print_stream_data_from_stream_buffer, args=(ubwa,))
worker_thread.start()

while True: 
    existing_user_list = get_users()
    for user in existing_user_list:
        # if stream is not started:
            api_key, api_secret = get_user_api_keys(user_id=user.id)
            ubwa.create_stream("arr", "!userData", api_key=api_key, api_secret=api_secret, stream_label=f"User_{user.id}")
            time.sleep(1)
ArnoldWolfstein commented 3 years ago

I see, I'm not a found user of infinite loops but this solves my use case. After looking in more details of the library classes/modules I understood your design philosophy better. Thanks a alot @oliver-zehentleitner !