LUCIT-Systems-and-Development / unicorn-binance-websocket-api

A Python SDK by LUCIT to use the Binance Websocket API`s (com+testnet, com-margin+testnet, com-isolated_margin+testnet, com-futures+testnet, com-coin_futures, us, tr, dex/chain+testnet) in a simple, fast, flexible, robust and fully-featured way.
https://unicorn-binance-websocket-api.docs.lucit.tech/
Other
680 stars 165 forks source link

Async Read Stream Data from buffer #144

Closed brianmori closed 2 years ago

brianmori commented 3 years ago

Hello,

I went through the documentation and examples how to read data from the stream, and I would like to ask a question.

I found out there is the function pop_stream_data_from_stream_buffer and I saw data can be read from the stream with this snippet:

    msg = bwsm.pop_stream_data_from_stream_buffer(book_ticker_id)
    if msg:
        book_ticker(msg)
    msg = bwsm.pop_stream_data_from_stream_buffer(user_stream_id)
    if msg:
        order_status(msg)
    time.sleep(0.01)

Would it be possible to read the data in an asynchronous way without doing the "time.sleep" ?

Thank you :)

ffischer42 commented 3 years ago

Hi brianmori :-)

I've just started using this package and as I understood it so far you don't have to use the sleep command. But your network will be thankful if you don't ask in every round if there is something new in the buffer.

You don't need to use the sleep command if there is still somthing in the buffer. But if it's empty and you don't wait between asking for updates your network might get into trouble.

When monitoring a stream, I use pop_stream_data_from_stream_buffer as long as there is something in the buffer (without waiting in between). And when this one is empty, I set a short sleep command and start again processing the buffer.

Cheers, Felix

jon4hz commented 3 years ago

Hoi @brianmori

I was working on a script to use UBWA in combination with asyncio but truth be told, I'm not an expert on asyncio so I am really not sure if it's best practice. I was playing around with the following code:

from __future__ import print_function
from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import BinanceWebSocketApiManager
import threading, os, time, asyncio

binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com")

async def print_stream_data_from_stream_buffer(binance_websocket_api_manager):
    print("waiting 5 seconds, then we start flushing the stream_buffer")
    await asyncio.sleep(5)
    while True:
        if binance_websocket_api_manager.is_manager_stopping():
            exit(0)
        oldest_stream_data_from_stream_buffer = binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
        if oldest_stream_data_from_stream_buffer is False:
            await asyncio.sleep(0.01)
        else:
            try:
                print(oldest_stream_data_from_stream_buffer)

            except Exception as e:
                # Any kind of error...
                # not able to process the data? write it back to the stream_buffer
                print(f"Error: {e}")
                binance_websocket_api_manager.add_to_stream_buffer(oldest_stream_data_from_stream_buffer)

def run_socket_function(loop, binance_websocket_api_manager):
    # set the event loop and execute it
    asyncio.set_event_loop(loop)
    loop.run_until_complete(print_stream_data_from_stream_buffer(binance_websocket_api_manager))

if __name__ == "__main__":
    # create event loop
    loop = asyncio.new_event_loop()

    # subscripe to the socket
    binance_websocket_api_manager.create_stream('miniTicker', 'BTCUSDT', output='dict')

    # start a worker process to process to move the received stream_data from the stream_buffer to a print function
    worker_thread = threading.Thread(target=run_socket_function, args=(loop, binance_websocket_api_manager))
    worker_thread.start()
    time.sleep(5)
oliver-zehentleitner commented 3 years ago

Hi brianmori :-)

I've just started using this package and as I understood it so far you don't have to use the sleep command. But your network will be thankful if you don't ask in every round if there is something new in the buffer.

You don't need to use the sleep command if there is still somthing in the buffer. But if it's empty and you don't wait between asking for updates your network might get into trouble.

When monitoring a stream, I use pop_stream_data_from_stream_buffer as long as there is something in the buffer (without waiting in between). And when this one is empty, I set a short sleep command and start again processing the buffer.

Cheers, Felix

What you describe is doing it in a loop. so you read, process and only when processing is finished you start the next loop. The difference to async processing is, that you start processing the next item even if the loop round is not finished yet.the loop get executed parallel.

ffischer42 commented 3 years ago

What you describe is doing it in a loop. so you read, process and only when processing is finished you start the next loop. The difference to async processing is, that you start processing the next item even if the loop round is not finished yet.the loop get executed parallel.

Yes, that's clear to me. I'm running that loop in an async process with kind of an inbox. But I can't describe it very well in Python, as I'm using your package via PyCall in Julia ;-) async programming is not that complicated here:

@async while true
    msg = bwsm.pop_stream_data_from_stream_buffer(id)
    # if the buffer is not empty, add it to my inbox. Is it empty, sleep
    if msg != false
        push!(inbox, msg)
    else
        sleep(0.1)
    end
end

Or do I get the question wrong?

oliver-zehentleitner commented 3 years ago

i am not sure now, but as far as i know you guys would need an await before the pop_stream_data_from_stream_buffer().

brianmori commented 3 years ago

i am not sure now, but as far as i know you guys would need an await before the pop_stream_data_from_stream_buffer().

My goal is to reduce CPU consumption, I noticed the looping causes a performance hit when it has a low value. I am going to try the await as @jon4hz and you proposed

P.S.: great work for this lib

brianmori commented 3 years ago

i am not sure now, but as far as i know you guys would need an await before the pop_stream_data_from_stream_buffer().

The async before the pop_stream_data_from_stream_buffer() would not block the thread, still the idea is to have a callback function when there is something in the buffer and avoid the sleep.

The function could allow a callback method invoked when there is something in the stream

I saw asyncio supports callbacks

I have not yet found the good function in the API manager

oliver-zehentleitner commented 3 years ago

callback: https://oliver-zehentleitner.github.io/unicorn-binance-websocket-api/unicorn_binance_websocket_api.html?highlight=process_stream_data

brianmori commented 3 years ago

callback: https://oliver-zehentleitner.github.io/unicorn-binance-websocket-api/unicorn_binance_websocket_api.html?highlight=process_stream_data

thank you very much, I tested yesterday evening and it works like a charm.

One last question, have you ever received events from Binance unordered ?

oliver-zehentleitner commented 3 years ago

One last question, have you ever received events from Binance unordered ?

I dont know :)

jon4hz commented 3 years ago

Hey @brianmori maybe I missed something but how did you implement it now? Did you just put an await infront of the pop_stream_from_stream_buffer() or did you edit anything else?

brianmori commented 3 years ago

Hey @brianmori maybe I missed something but how did you implement it now? Did you just put an await infront of the pop_stream_from_stream_buffer() or did you edit anything else?

Hi @jon4hz

Here it is what I have done:

def ingestion(stream_data, stream_buffer_name=False):
    # filter of events
    print(stream_data)

binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures", process_stream_data=ingestion)

When there is an event from websocket, it gets treated without the while true loop and sleep

jon4hz commented 3 years ago

Hey @brianmori maybe I missed something but how did you implement it now? Did you just put an await infront of the pop_stream_from_stream_buffer() or did you edit anything else?

Hi @jon4hz

Here it is what I have done:

def ingestion(stream_data, stream_buffer_name=False):
    # filter of events
    print(stream_data)

binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures", process_stream_data=ingestion)

When there is an event from websocket, it gets treated without the while true loop and sleep

Hmm gotta check that out, too. But in that case you don't use asyncio, right? Do you know if that method compatible with asyncio?

brianmori commented 3 years ago

Hey @brianmori maybe I missed something but how did you implement it now? Did you just put an await infront of the pop_stream_from_stream_buffer() or did you edit anything else?

Hi @jon4hz Here it is what I have done:

def ingestion(stream_data, stream_buffer_name=False):
    # filter of events
    print(stream_data)

binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures", process_stream_data=ingestion)

When there is an event from websocket, it gets treated without the while true loop and sleep

Hmm gotta check that out, too. But in that case you don't use asyncio, right? Do you know if that method compatible with asyncio?

No need to use asyncio from my side as my purpose is to treat the data received as soon as it is available. This is what the process_stream_data does. It is a callback. The library uses asyncio behind thee scenes, no need to implement anything with asyncio for this use case

oliver-zehentleitner commented 3 years ago

thats right, with use of the callback function the runtime starts here with an await: https://github.com/oliver-zehentleitner/unicorn-binance-websocket-api/blob/6d967ed0f9d6dc2cc9d36e1fceb041c781b92b4b/unicorn_binance_websocket_api/unicorn_binance_websocket_api_socket.py#L110 And a few lines later it adds the received data to the stream_buffer or starts your callback function - depends if there is a method set or not...: https://github.com/oliver-zehentleitner/unicorn-binance-websocket-api/blob/6d967ed0f9d6dc2cc9d36e1fceb041c781b92b4b/unicorn_binance_websocket_api/unicorn_binance_websocket_api_socket.py#L149

But the stream buffer makes sense, I think it would be good to create an async method for pop_stream_data_from_stream_buffer() witch supports await ...

brianmori commented 3 years ago

we have done some tests @oliver-zehentleitner and I would like to confirm something.

In case the pop_stream_data_from_stream_buffer() is not used in favor of adding a custom callback "process_stream_data=ingestion"

is there a risk of concurrency if multiple events arrive at the same time and an event with a time T+1 is processed before an event with time T? The use case is to treat events from binance in a FIFO approach

oliver-zehentleitner commented 3 years ago

I think you can not receive multiple events at the same time, because they are sent sequential through the websocket, one after the other. With asyncio we just start receiving and processing the next event, even if the last loop is not finished yet.

I think you should be prepared that race conditions can happen and also binance wrote in their docs that under heavy loads events can be sent unordered through the websocket connection. (sorry, didnt find a link... maybe that has changed?)

papapon commented 3 years ago

I mixed the code of jon4hz and brianmori:


from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import BinanceWebSocketApiManager

async def ingestion(stream_data, stream_buffer_name=False):
    await asyncio.sleep(3)
    # filter of events
    print(stream_data)

async def main():
    bsm = await BinanceWebSocketApiManager(exchange="binance.com", process_stream_data=ingestion)
    channels = ['trade', 'kline_1m']
    markets = ['bnbusdt', 'wtcusdt']
    bsm.create_stream(channels, markets, output='dict')

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

The problem is that I can't put an await on BinanceWebSocketApiManager but if I remove it ingestion was never awaited. What is the best solution?

oliver-zehentleitner commented 3 years ago

process_stream_data() Is started with an await like explained here: https://github.com/oliver-zehentleitner/unicorn-binance-websocket-api/issues/144#issuecomment-778647505

If you do it that way, I think there is nothing more to do...

papapon commented 3 years ago

Thank you for your quick response but I'm not sure I understand. Perhaps, my first message was not clear.

The code of brianmori works but I need my ingestion function to be async. Indeed, in my async function I insert in postgres with asyncpg.

This is because I was using a websocket that used asyncio (from python-binance repo -> asyncio branch). I want to change because the code is not maintained but I don't want to change everything that comes with it.

jon4hz commented 3 years ago

That's the same reason I wrote my async code, haha. But I don't think that you can use an async function as a callback. However you can use a "normal" function as callback to add the ingestion function as a coroutine to the event loop.

from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import BinanceWebSocketApiManager
import asyncio

async def ingestion(stream_data, stream_buffer_name=False):
    # filter of events
    print(stream_data)

def callback(stream_data, stream_buffer_name=False): 
    asyncio.ensure_future(ingestion(stream_data, stream_buffer_name))

async def main():
    bsm = BinanceWebSocketApiManager(exchange="binance.com", process_stream_data=callback)
    channels = ['trade', 'kline_1m']
    markets = ['bnbusdt', 'wtcusdt']
    bsm.create_stream(channels, markets, output='dict')

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
papapon commented 3 years ago

Thank you jon4hz.

This is exactly what I was looking for. I thought I was going to have to switch everything to sync :-).

papapon commented 3 years ago

Is there any other solution?

By integrating in my code, I now get: "attached to a different loop". Is it therefore possible to link to the initial loop?

oliver-zehentleitner commented 3 years ago

Not now! :D

The loops are created here:

I think you need to save the loop in the scope of the manager instance in stream_list

You could build then a getter method like get_loop_by_stream_id(stream_id).

I would appreciate a pull request, because i want to try to fix this issue with exiting the loop from outside.

oliver-zehentleitner commented 3 years ago

get_event_loop_by_stream_id() is coded and will be released with 1.30.0.