Open femtotrader opened 6 years ago
@mrocklin I noticed your comment http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1#comment-3599845311
Pull requests welcome.
A first step could be to define an API for this streaming approach of resampling given the fact that resampling a Pandas Series returns a DataFrame but resampling a Pandas DataFrame returns a DataFrame with MultiIndex
Resampling a Series
import pandas_datareader as pdr
df = pdr.DataReader("IBM", "google")
prices = df["Close"]
volumes = df["Volume"]
offset = "M"
prices_monthly = prices.resample(offset).ohlc()
volumes_monthly = volumes.resample(offset).sum()
Resampling a DataFrame
import pandas_datareader as pdr
data = pdr.DataReader(["IBM", "MSFT"], "google")
prices = data["Close"]
volumes = data["Volume"].fillna(0).astype(int)
offset = "M"
prices_monthly = prices.resample(offset).ohlc()
volumes_monthly = volumes.resample(offset).sum()
My guess is that resample could be done in a similar way to how we handle rolling currently. We maintain enough of a history of recent data to fill in history. Whenever new data comes in we concatenate it to this history, perform the pandas operation, slice off the historical bit, emit that data and then keep enough of the new data to serve the next data that comes through. If you wanted to take a look at the Rolling implementation I suspect that that would be a decent start.
Not sure I will have enough time and knowledge about how streamz is working to give you some help about this. Really sorry but it could be a very nice feature
Here is a sample code to generate fake data (trades like):
from streamz import Stream
import pandas as pd
import numpy as np
index = pd.date_range("2017-1-1", "2018-1-1", freq="1Min")[:-1]
n = len(index)
df = pd.DataFrame(index=index)
np.random.seed(123)
df["direction"] = np.random.choice([1,-1], n)
df["price"] = 100 + pd.Series(np.random.rand(n)-0.5, index=index).cumsum()
df["volume"] = (pd.Series(np.random.rand(n) * 1000, index=index)).astype(int)
print(df)
for row in df.iterrows():
print(row)
for t in df.itertuples():
print(t)
or ticks like
import pandas as pd
import numpy as np
index = pd.date_range("2017-1-1", "2018-1-1", freq="1Min")[:-1]
n = len(index)
df = pd.DataFrame(index=index)
np.random.seed(123)
df["bid"] = 100 + pd.Series(np.random.rand(n)-0.5, index=index).cumsum()
df["spread"] = pd.Series(np.random.rand(n), index=index)
df["ask"] = df["bid"] + df["spread"]
df["volume"] = (pd.Series(np.random.rand(n) * 1000, index=index)).astype(int)
df = df[["ask", "bid", "spread", "volume"]]
resampling a dataframe that's already OHLCV in pandas is problematic since pandas will take each column and resample it into OHLC. So you get OHLC for the O, for the H, for L and for C. The whole multindex mess... To avoid this, you gotta resample each series inside the df by its own rules.
something like this:
pd.DataFrame(
{"O":ohlc_prices.O.resample(offset).first(),
"H": ohlc_prices.H.resample(offset).max(),
"L": ohlc_prices.L.resample(offset).min(),
"C":ohlc_prices.L.resample(offset).last(),
"V": volumes.resample(offset).sum()})
Alternatively, resample can be used with the agg function
df.resample(offset).agg({'Open':'first', 'High':'max', 'Low':'min', 'Close':'last', 'Volume':'sum'})
In related news, @shwina / the rapids/cudf folks have merged groupby frequency and resampling functionality into cudf v21.12: https://github.com/rapidsai/cudf/pull/9178
Hello,
let's imagine you are receiving trades from an exchange (buy/sell, price, volume).
Streamz should provide a way to live resample this kind of data.
price could be resampled using OHLC (Open, High, Low, Close) and volume could be resampled using sum
ideally Pandas offsets should be supported https://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases
Kind regards