Navigation support and y-range update with real time plotting #74

degloff commented 3 years ago

First thanks a lot for this library, it is very useful for me.

I cooked up a real time plot example using cryptofeed pulling orderbook data, following the bitmex example in this repo.

The problems I face are:

  1. The horizontal volume plot goes off the window, as the scale is determined by the candles. How to fix that best? Do I have to calculate the range myself?
  2. What about having another window on the left where one can add such volume plots?
  3. Imagine you want to zoom in, then the new data may be far in the right and you have to scroll for quite a bit, how to add a quick navigation (button or scroll fly modus) to get to the most right data so we can see the realtime data.
  4. Any other ideas for improvement? Do I use the update logic in a correct way? I had to use Process because of cryptofeed limitations, which did not allow me to run the feed in an other thread. Also I. have to update the orderbook in that process, otherwise messages may be lost in the pipe, leading to wrong orderbook (experienced myself).

Here is the code so you can try around. Happy to contribute it to the samples.

#!/usr/bin/env python3

from decimal import Decimal
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection

import dateutil.parser
import finplot as fplt
import pandas as pd
import pytz
import requests
from cryptofeed import FeedHandler
from cryptofeed.callback import BookCallback, BookUpdateCallback, TradeCallback
from cryptofeed.defines import L2_BOOK, BOOK_DELTA, TRADES
from cryptofeed.exchanges import (Binance)
from sortedcontainers import SortedDict

utc2timestamp = lambda s: int(dateutil.parser.parse(s).replace(tzinfo=pytz.utc).timestamp() * 1000)

def download_price_history(symbol, start_time, end_time, interval_mins):
    interval_ms = 1000 * 60 * interval_mins
    interval_str = '%sm' % interval_mins if interval_mins < 60 else '%sh' % (interval_mins // 60)
    start_time = utc2timestamp(start_time)
    end_time = utc2timestamp(end_time)
    data = []
    for start_t in range(start_time, end_time, 1000 * interval_ms):
        end_t = start_t + 1000 * interval_ms
        if end_t >= end_time:
            end_t = end_time - interval_ms
        url = 'https://www.binance.com/fapi/v1/klines?interval=%s&limit=%s&symbol=%s&startTime=%s&endTime=%s' % (
            interval_str, 1000, symbol, start_t, end_t)
        d = requests.get(url).json()
        data += d
    df = pd.DataFrame(data, columns='t o h l c v ax bx cx dx ex fx'.split())
    return df.astype({'t': 'datetime64[ms]', 'o': float, 'h': float, 'l': float, 'c': float, 'v': float})

class RealtimePlotter:
    def __init__(self, exchange='Binance', symbol='BTCUSDT', start_time='2020-12-08', end_time='2020-12-10', interval_mins=1, num_levels = 50):
        self.num_levels = num_levels
        self.orderbook = None
        self.trade_output, self.trade_input = Pipe()
        self.book_output, self.book_input = Pipe()
        self.plots = []
        self.ohlcv = download_price_history(symbol, start_time, end_time, interval_mins)
        self.ax = fplt.create_plot(f'Realtime {exchange} {symbol}', init_zoom_periods=720, maximize=False)

    def update_candlestick_data(self, trade, interval_mins=1):
        t = int(trade['timestamp'])
        t -= t % (60 * interval_mins)
        t = pd.to_datetime(t, unit='s')
        last_price = float(trade['price'])
        amount = float(trade['amount'])
        t_last = self.ohlcv['t'].iloc[-1]
        if t < t_last:
            # ignore already-recorded trades
        elif t > t_last:
            # add new candle
            o = self.ohlcv['c'].iloc[-1]
            h = last_price if last_price > o else o
            l = o if o < last_price else last_price
            df = pd.DataFrame(dict(t=[t], o=[o], c=[last_price], h=[l], l=[l], v=[amount]))
            self.ohlcv = pd.concat([self.ohlcv, df], ignore_index=True, sort=False)
            # update last candle
            i = self.ohlcv.index.max()
            self.ohlcv.loc[i, 'c'] = last_price
            self.ohlcv.loc[i, 'v'] += amount
            if last_price > self.ohlcv.loc[i, 'h']:
                self.ohlcv.loc[i, 'h'] = last_price
            if last_price < self.ohlcv.loc[i, 'l']:
                self.ohlcv.loc[i, 'l'] = last_price

    def aggregate_ordebook(self):
        bids = self.orderbook['book']['bid']
        asks = self.orderbook['book']['ask']
        bids['price'] -= 0.5
        bids['volume'] = -bids['volume'].cumsum()
        asks['volume'] = -asks['volume'].cumsum()
        aggregated_book = pd.concat([bids.iloc[::-1], asks])
        return [[len(self.ohlcv) + 0.5, aggregated_book]]

    def update_plot(self):
        while self.trade_output.poll():
            trade = self.trade_output.recv()
            # print(f'trade {trade["price"]} {trade["amount"]} {trade["timestamp"]}')

        while self.book_output.poll():
            self.orderbook = self.book_output.recv()
            print(f'top {self.orderbook["book"]["bid"].iloc[0].values.tolist()} <-> {self.orderbook["book"]["ask"].iloc[0].values.tolist()}')

        candlesticks = self.ohlcv['t o c h l'.split()]
        volumes = self.ohlcv['t o c v'.split()]

        if not self.plots:  # 1st time
            candlestick_plot = fplt.candlestick_ochl(candlesticks)
            self.plots.append(fplt.volume_ocv(volumes, ax=self.ax.overlay()))
            x = len(candlesticks)+2.5
            y = candlesticks.c.iloc[-1]
            orderbook = [[x,[(y,1)]]]
            orderbook_colorfunc = fplt.horizvol_colorfilter([(0,'bull'),(10,'bear')])
            orderbook_plot = fplt.horiz_time_volume(orderbook, candle_width=1, draw_body=10, colorfunc=orderbook_colorfunc)

            if self.orderbook is not None:
                aggregated = self.aggregate_ordebook()

class SingleExchangeFeedHandler:
    def __init__(self, symbol: str, num_levels: int, trade_input: Connection, book_input: Connection):
        self.symbol = symbol
        self.num_levels = num_levels
        self.trade_input = trade_input
        self.book_input = book_input
        self.orderbook = None
        self.oderbook_bid_view = None
        self.oderbook_ask_view = None

    def apply_book_delta(self, delta: dict, side: str):
        delta = delta[side]
        book: SortedDict = self.orderbook[side]
        for (level, quantity) in delta:
            if level in book:
                if quantity == Decimal('0E-8'):
                    del book[level]
                    book[level] = quantity

    async def trade(self, feed, pair, order_id, timestamp, side, amount, price, receipt_timestamp):
        assert isinstance(timestamp, float)
        assert isinstance(side, str)
        assert isinstance(amount, Decimal)
        assert isinstance(price, Decimal)
        data = dict(
        #print(f'----> trade {price} {amount} {timestamp}')

    async def book(self, feed, pair, book, timestamp, receipt_timestamp):
        data = dict(
        self.orderbook = book
        self.oderbook_bid_view = book['bid'].items()
        self.oderbook_ask_view = book['ask'].items()

    async def book_update(self, feed, pair, delta, timestamp, receipt_timestamp):
        self.apply_book_delta(delta, 'bid')
        self.apply_book_delta(delta, 'ask')
        nb = min(self.num_levels, len(self.oderbook_bid_view))
        na = min(self.num_levels, len(self.oderbook_ask_view))
        bids = pd.DataFrame(self.oderbook_bid_view[-1:-(nb+1):-1], columns=['price', 'volume']).astype(float)
        asks = pd.DataFrame(self.oderbook_ask_view[0:na:1], columns=['price', 'volume']).astype(float)
        data = dict(
            book=dict(bid=bids, ask=asks),
        # print(f'----> {self.orderbook["bid"].items()[-1]} <-> {self.orderbook["ask"].items()[0]}')

    def run(self):
        f = FeedHandler()
                           channels=[TRADES, L2_BOOK],
                               TRADES: TradeCallback(self.trade),
                               L2_BOOK: BookCallback(self.book),
                               BOOK_DELTA: BookUpdateCallback(self.book_update),

        print(f'starting crypto market feed handler')

if __name__ == '__main__':
    plotter = RealtimePlotter(symbol='BTCUSDT', start_time='2020-12-09', num_levels=200)

    feed_handler = SingleExchangeFeedHandler(

    p = Process(target=feed_handler.run, args=())

    fplt.timer_callback(plotter.update_plot, 0.1)
highfestiva commented 3 years ago
  1. Unfortunately I didn't do anything more on that. The horizontal volume has a data structure of its own, and doesn't play as nice with the rest of the code. A pull request would be much appreciated.
  2. It's possibly using Qt and doing the layout yourself with create_plot_widget(), but not using the standard layout.
  3. Press Home button. If that's too far off for your hand, you'll have to add a pinch of code to handle some additional mouse button or so.
  4. I have no opinions on your code as long as it works, but curious as to if you have suggestions for improvements regarding .update_data() in particular, or the api in general.
degloff commented 3 years ago

Thanks for the reply. I see what I can do.

There is another catch with update_data, I tried to use the stepMode to plot lines like this:

line1: pg.PlotDataItem = plt.plot(x, y, pen='g', symbol='x', symbolPen='g', symbolBrush=0.2, name='green', stepMode=True)

The catch is that 'len(x) == len(y)+1` is required. Imagine that the x is time, then the last (x,y) is prolonged horizontal till "now". The current setup of update_data does not allow me to do that.

Any ideas?

degloff commented 3 years ago

BTW, it would be helpful to understand your rational of update_data better, if you can explain a bit more it would be very helpful.

highfestiva commented 3 years ago

If you have an additional data point on your y-axis it requires a time to be apart of a time series. And to plot it too. Otherwise at what time should it go?

The rationale of update_data() is unfortunately missing. I built it organically/ad hoc, and haven't looked back still. It's pretty slow, highly complicated and already with a large portion of logic to get Qt to re-render it. But the thing that tells me it could be a lot better is that it's always clunky to use. You always have to loop and if-not-created-then-else-update, which is ugly.

degloff commented 3 years ago

If we do not have an additional point we may just duplicate the last x value. Need to check your code how that could be added.

highfestiva commented 3 years ago

That would just add two datapoints at the same point in time, i.e. two candles on a single X-position. This belongs in user space, not in the lib.

degloff commented 3 years ago

No, that does not work as the data setup enforces same length of x and y as you pull it from the PandasDataSource. I tried that already.

degloff commented 3 years ago

I went through the update_data path and it is indeed grown a bit organically. Hard to see from the code what is a feature and what is needed and what can be simplified.

highfestiva commented 3 years ago

For bullet number three, you can now use the set_x_pos() function in the API to move the viewport. See the example.

highfestiva commented 3 years ago

I've thought about your first bullet, and I'm pretty satisfied with keeping it the way it currently is. I.e. plotting only a volume profile zooms/pans on that, but if used in combination with higher-resolution data (candles), it falls into background (standalone). That should have answered all of your questions.