python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Suggestions for improving code performance #478

Open River-Shi opened 2 months ago

River-Shi commented 2 months ago

This is a code for calculating the rolling average of the future ratio/spot ratio - 1 in real-time. Since there can be a large amount of data streaming in from the websocket every second, about 100-200 data points, I’d like to know if you have any suggestions to improve performance?

image
import asyncio
import numpy as np
from streamz import Stream
from tradebot.exchange import BinanceWebsocketManager
from tradebot.entity import log_register
from tradebot.constants import MARKET_URLS

# ratio description

# 1. calulate the ratio of future price and spot price
# 2. add ratio to a rolling window of size 20
# 3. calculate the mean of the rolling window

log = log_register.get_logger("BTCUSDT", level="INFO", flush=False)

spot_stream = Stream()
future_stream = Stream()

window_size = 20

def cb_future(msg):
    if "e" in msg:
        future_stream.emit(msg)

def cb_spot(msg):
    if "e" in msg:
        spot_stream.emit(msg)

async def main():
    try:
        ws_spot_client = BinanceWebsocketManager(base_url = "wss://stream.binance.com:9443/ws")
        ws_um_client = BinanceWebsocketManager(base_url = "wss://fstream.binance.com/ws")
        await ws_um_client.subscribe_trade("BTCUSDT", callback=cb_future)
        await ws_spot_client.subscribe_trade("BTCUSDT", callback=cb_spot)

        ratio = spot_stream.combine_latest(future_stream).map(lambda x: float(x[1]['p']) / float(x[0]['p']) - 1)
        ratio.sliding_window(window_size).map(lambda window: np.mean(window)).sink(lambda x: print(f"Ratio Mean: {x:.8f}")) 
        # await ws_client.subscribe_book_ticker("ETHUSDT", callback=cb)
        # await ws_client.subscribe_agg_trades(["BTCUSDT", "ETHUSDT"], callback=cb)
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        await ws_spot_client.close()
        await ws_um_client.close()
        print("Websocket closed")

if __name__ == "__main__":
    asyncio.run(main())

Here is the implementation of BinanceWebsocket

class WebsocketManager(ABC):
    def __init__(
        self,
        base_url: str,
        ping_interval: int = 5,
        ping_timeout: int = 5,
        close_timeout: int = 1,
        max_queue: int = 12,
    ):
        self._base_url = base_url
        self._ping_interval = ping_interval
        self._ping_timeout = ping_timeout
        self._close_timeout = close_timeout
        self._max_queue = max_queue

        self._tasks: List[asyncio.Task] = []
        self._subscripions = defaultdict(asyncio.Queue)
        self._log = log_register.get_logger(name=type(self).__name__, level="INFO", flush=True)

    async def _consume(self, subscription_id: str, callback: Callable[..., Any] = None, *args, **kwargs):
        while True:
            msg = await self._subscripions[subscription_id].get()
            if asyncio.iscoroutinefunction(callback):
                await callback(msg, *args, **kwargs)
            else:
                callback(msg, *args, **kwargs)
            self._subscripions[subscription_id].task_done()

    @abstractmethod
    async def _subscribe(self, symbol: str, typ: str, channel: str, queue_id: str):
        pass

    async def close(self):
        for task in self._tasks:
            task.cancel()
        await asyncio.gather(*self._tasks, return_exceptions=True)
        self._log.info("All WebSocket connections closed.")

class BinanceWebsocketManager(WebsocketManager):
    def __init__(self, base_url: str):
        super().__init__(
            base_url=base_url,
            ping_interval=5,
            ping_timeout=5,
            close_timeout=1,
            max_queue=12,
        )

    async def _subscribe(self, payload: Dict[str, Any], subscription_id: str):
        async for websocket in websockets.connect(
            uri = self._base_url,
            ping_interval=self._ping_interval,
            ping_timeout=self._ping_timeout,
            close_timeout=self._close_timeout,
            max_queue=self._max_queue,
        ):
            try:
                payload = json.dumps(payload)
                await websocket.send(payload)
                async for msg in websocket:
                    msg = orjson.loads(msg)
                    await self._subscripions[subscription_id].put(msg)
            except websockets.ConnectionClosed:
                self._log.error(f"Connection closed, reconnecting...")

    async def subscribe_book_ticker(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
        subscription_id = f"book_ticker.{symbol}"
        id = int(time.time() * 1000)
        payload = {
            "method": "SUBSCRIBE",
            "params": [f"{symbol.lower()}@bookTicker"],
            "id": id
        }
        if subscription_id not in self._subscripions:
            self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
            self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
        else:
            self._log.info(f"Already subscribed to {subscription_id}")

    async def subscribe_book_tickers(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
        for symbol in symbols:
            await self.subscribe_book_ticker(symbol, callback=callback, *args, **kwargs)

    async def subscribe_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
        subscription_id = f"trade.{symbol}"
        id = int(time.time() * 1000)
        payload = {
            "method": "SUBSCRIBE",
            "params": [f"{symbol.lower()}@trade"],
            "id": id
        }
        if subscription_id not in self._subscripions: 
            self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
            self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
        else:
            self._log.info(f"Already subscribed to {subscription_id}")

    async def subscribe_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
        for symbol in symbols:
            await self.subscribe_trade(symbol, callback=callback, *args, **kwargs)

    async def subscribe_agg_trade(self, symbol: str, callback: Callable[..., Any] = None, *args, **kwargs):
        subscription_id = f"agg_trade.{symbol}"
        id = int(time.time() * 1000)
        payload = {
            "method": "SUBSCRIBE",
            "params": [f"{symbol.lower()}@aggTrade"],
            "id": id
        }
        if subscription_id not in self._subscripions:
            self._tasks.append(asyncio.create_task(self._consume(subscription_id, callback=callback, *args, **kwargs)))
            self._tasks.append(asyncio.create_task(self._subscribe(payload, subscription_id)))
        else:
            self._log.info(f"Already subscribed to {subscription_id}")  

    async def subscribe_agg_trades(self, symbols: List[str], callback: Callable[..., Any] = None, *args, **kwargs):
        for symbol in symbols:
            await self.subscribe_agg_trade(symbol, callback=callback, *args, **kwargs)
martindurant commented 2 months ago

Do you find the performance is lacking? 100 events per second is not a lot for streamz. However, the library you are using and websocket latency are other things, as indeed any CPU time you might be needing for the calculation - I don't know from you code.

River-Shi commented 2 months ago

Do you find the performance is lacking? 100 events per second is not a lot for streamz. However, the library you are using and websocket latency are other things, as indeed any CPU time you might be needing for the calculation - I don't know from you code.

Thanks for helping. I have another questions:

from streamz import Stream
import asyncio
import time

def increment(x):
    time.sleep(0.1)
    return x + 1

async def write(x):
    await asyncio.sleep(0.2)
    print(x)

async def f():
    source = Stream(asynchronous=True)
    source.map(increment).rate_limit(0.500).sink(write)

    for x in range(10):
        await source.emit(x) 

if __name__ == "__main__":
    asyncio.run(f())
from tornado import gen
import time
from streamz import Stream
from tornado.ioloop import IOLoop
def increment(x):
    """ A blocking increment function

    Simulates a computational function that was not designed to work
    asynchronously
    """
    time.sleep(0.1)
    return x + 1

@gen.coroutine
def write(x):
    """ A non-blocking write function

    Simulates writing to a database asynchronously
    """
    yield gen.sleep(0.2)
    print(x)

@gen.coroutine
def f():
    source = Stream(asynchronous=True)  # tell the stream we're working asynchronously
    source.map(increment).rate_limit(0.500).sink(write)

    for x in range(10):
        yield source.emit(x)

IOLoop().run_sync(f)

what is the different of this two example? I don't think there's any difference in performance between using async and sync here; await emit still blocks the subsequent processes. I tried using asyncio.create(source.emit(x)), but that just threw an error.

I think there are no difference with:

from streamz import Stream
import asyncio
import time

def increment(x):
    time.sleep(0.1)
    return x + 1

def write(x):
    time.sleep(0.2)
    print(x)

def f():
    source = Stream()
    source.map(increment).rate_limit(0.500).sink(write)

    for x in range(10):
        source.emit(x)

if __name__ == "__main__":
    f()
martindurant commented 2 months ago

Correct, there will be no difference to a linear chain of event processing. The point of await, is that other async things can be happening at the same time ("concurrently"). In this case, there are no other things to process while waiting.

River-Shi commented 2 months ago

Correct, there will be no difference to a linear chain of event processing. The point of await, is that other async things can be happening at the same time ("concurrently"). In this case, there are no other things to process while waiting.

Can you give me some examples of concurrent or non-linear chain of event processing? I'm really struggling to think of any applications. I am trying to emit concurrently, but it causes error.

from streamz import Stream
import asyncio
import time

def increment(x):
    time.sleep(0.1)
    return x + 1

async def write(x):
    await asyncio.sleep(0.2)
    print(x)

async def f():
    source = Stream(asynchronous=True)
    source.map(increment).rate_limit(0.500).sink(write)

    for x in range(10):
        asyncio.create_task(source.emit(x)) # raise error

if __name__ == "__main__":
    asyncio.run(f())
martindurant commented 2 months ago
    for x in range(10):
        asyncio.create_task(source.emit(x)) # raise error

Does indeed kick off all the coroutines, but they all have first a blocking wait, and then wait again before output.

Consider:

async def write(x):
    print(x)
    await asyncio.sleep(0.2)

async def f():
    source = Stream(asynchronous=True)
    source.sink(write)
    await asyncio.gather(*[source.emit(x) for x in range(10)])

if __name__ == "__main__":
    asyncio.run(f())

Here, all the values print immediately, and the whole takes 0.2s to run.

Material-Scientist commented 1 month ago
.emit(msg)

.emit() blocks on each message, whereas ._emit() does not (returns a list futures). You could instead buffer a set number of futures before calling await asyncio.gather(*futures) on them.

Also, asynchronous=True will launch the ioloop in the current thread, whereas asynchronous=False will launch it on a separate thread. I've had issues in the past where I could not use asynchronous=True because the event loop was already running in a .py script.