AI4Finance-Foundation / FinRL-Meta

FinRL­-Meta: Dynamic datasets and market environments for FinRL.
https://ai4finance.org
MIT License
1.28k stars 583 forks source link

[Suggestion] Include timestamp / DatetimeIndex for plotting #48

Open cryptocoinserver opened 2 years ago

cryptocoinserver commented 2 years ago

After some more exploring, one thing I realized is, that plotting (with pyfolio / quantstats) to evaluate the results is harder than it needs to be. Currently we have no timestamps easily available which are needed for direct plotting or using the above libraries.

The Demo_FinRL_Meta_Integrate_Trends_data_to_DOW_Jones.ipynb is a good example. It is done there, but with a (hacky) workaround using a Custom_DataProcessor / the yahoofinance dataframe timestamps.

A solution I could think of would be adding 3 functions to the env that return equal weight portfolio returns, buy-and-hold returns and the agent-returns including the timestamps. A pd.Series with DateTimeindex could work.

This would also be a good preparation for more advanced reward functions like (differential / deflated) sharpe, comparing to buy-and-hold etc. that rely on a return series.

What do you think?

rayrui312 commented 2 years ago

Great suggestions! Thanks a lot. We will consider adding them in the future. And if you have ideas on the specific implementation, you may share them with us through the Pull Request. We will much appreciate it.

cryptocoinserver commented 2 years ago

I think the issue/solution lies in return price_array, tech_array, turbulence_array in the data_processor / run function. If you would pass the whole dataframe all necessary data would be available. Not sure what best practice is, but I would suggest always using the main dataframe with all data. That way also would make it most flexible. Meaning the agent should also accept just the df instead of individual arrays. Additionally this part of Demo_MultiCrypto_Trading.ipynb raised one more question:

    #process data using unified data processor
    DP = DataProcessor(data_source,  **kwargs)
    price_array, tech_array, turbulence_array = DP.run(ticker_list, start_date
                                                       , end_date, time_interval, 
                                                       technical_indicator_list, 
                                                       if_vix)
    data_config = {'price_array':price_array,
                   'tech_array':tech_array,
                   'turbulence_array':turbulence_array}
    #build environment using processed data
    env_instance = env(config=data_config)

    #read parameters and load agents
    current_working_dir = kwargs.get('current_working_dir','./'+str(model_name))

    if drl_lib == 'elegantrl':
        break_step = kwargs.get('break_step', 1e6)
        erl_params = kwargs.get('erl_params')

        agent = DRLAgent_erl(env = env,
                             price_array = price_array,
                             tech_array=tech_array,
                             turbulence_array=turbulence_array)

More flexible:

    DP = DataProcessor(data_source,  **kwargs)
    df = DP.run(ticker_list, start_date , end_date, time_interval,  technical_indicator_list,   if_vix)

    agent = DRLAgent_erl(env = env,  df = df)

Then inside the Agent (or environment) get the necessary columns from the df (and convert them to an array where necessary). For that to work the columns would have to be named accordingly / always the same of course like:

With those naming conventions, you know what to get from the df inside the agent. For example with https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.filter.html you can get all columns containing "feature_" and so on.

That also makes it easy to add more / custom data features just by doing df['feature_mynewfeature']=great_feature before passing the df.

I think this is related to #46

cryptocoinserver commented 2 years ago

After looking at other examples and (gym)environments I think best practice would be doing it inside the env: https://github.com/AminHP/gym-anytrading/blob/master/gym_anytrading/envs/forex_env.py#L19 Init the env with the df and then processing the data there. The agent should just get the env as input. Then getting the data via env._process_data or env._get_price_array etc.

So my suggested changes would be:

This would mean change in nearly the whole project is necessary though. This would make this suggestion easily possible, as the env has the dataframe with the timestamps and its more or less adding this to the env:

 def _get_buy_and_hold(self):
        return pd.Series(self.df['returns'], index=self.df['date'])
cryptocoinserver commented 2 years ago

Example:

class DataProcessor:
    def __init__(self, data_source: str, **kwargs):
        self.df = pd.DataFrame()
        ...

    # all the functions just add / modify the dataframe
    def add_technical_indicator(self, tech_indicator_list: List[str]) -> None:
        """
        calculate technical indicators
        """
        # self.df.reset_index(drop=False, inplace=True)
        # self.df.drop(columns=["level_1"], inplace=True)
        # self.df.rename(columns={"level_0": "tic", "date": "time"}, inplace=True)
        stock = Sdf.retype(self.df.copy())
        unique_ticker = stock.tic.unique()

        for indicator in tech_indicator_list:
            indicator_df = pd.DataFrame()
            for i in range(len(unique_ticker)):
                try:
                    temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
                    temp_indicator = pd.DataFrame(temp_indicator)
                    temp_indicator["tic"] = unique_ticker[i]
                    temp_indicator["time"] = self.df[self.df.tic == unique_ticker[i]]["time"].to_list()
                    indicator_df = indicator_df.append(temp_indicator, ignore_index=True)
                except Exception as e:
                    print(e)
            indicator_df.dropna(inplace=True)
            self.df = self.df.merge(
                indicator_df[["tic", "time", indicator]], on=["tic", "time"], how="left"
            )
        self.df.sort_values(by=["time", "tic"], inplace=True)

    # return the (final) dataframe
    def get_df(self):
        return self.df
DP = DataProcessor(data_source, **kwargs)
# Get candle data (added to the dataframe)
DP.download_data(ticker_list, start_date, end_date, time_interval)
# Add indicators
DP.add_technical_indicator(technical_indicator_list)
# Very flexibel as I construct the df step by step. For example now we skip turbulance. Just using candle data and indicators. Then in the end we get the df.
df = DP.get_df()

# build environment using dataframe
env_instance = env(df=df)

# agent gets only the enviroment
agent = DRLAgent_erl(env=env)
class DRLAgent:
    """Provides implementations for DRL algorithms
    Attributes
    ----------
        env: gym environment class
            user-defined class
    Methods
    -------
        get_model()
            setup DRL algorithms
        train_model()
            train DRL algorithms in a train dataset
            and output the trained model
        DRL_prediction()
            make a prediction in a test dataset and get results
    """

    def __init__(self, env):
        self.env = env
        self.price_array = env._get_price_array
        self.tech_array = env._get_tech_array
        self.turbulence_array = env._get_turbulence_array
class Enviroment:
    def __init__(self, df):
        self.df = df

    @property
    def _get_price_array(self):
        unique_ticker = self.df.tic.unique()
        price_array = np.column_stack([self.df[df.tic == tic].close for tic in unique_ticker])
        return price_array

    @property
    def _get_tech_array(self):
        ...

    @property
    def _get_turbulence_array(self):
        ...

    # This is very easy now using the dataframe:
    @property
    def _get_buy_and_hold(self):
        return pd.Series(self.df['close'].pct_change(), index=self.df['time'])
zhumingpassional commented 2 years ago

Your suggestion is good. We consider to use the df['key'] to pass the data. We also consider using dict, d['key']: np.array to pass data, which is more efficient. In #46, we described this idea.

With respect to data processors. Now, we use several functions. In fact, some data may be missing, so we use bfill or ffill to fill the missing values. If users want to process it in their own way, they should define their own functions. We will consider to use one function and return the data.

TheSnowGuru commented 2 years ago

@cryptocoinserver great idea

cryptocoinserver commented 2 years ago

Your suggestion is good. We consider to use the df['key'] to pass the data. We also consider using dict, d['key']: np.array to pass data, which is more efficient. In #46, we described this idea.

I still think the dataframe approach inside the dataprocessor would be better. It has a way better usability. The conversion to numpy for performance makes sense, but isn't it enough to do it in the enviroment at _get_price_array or maybe even later in the agent's __init__.

With respect to data processors. Now, we use several functions. In fact, some data may be missing, so we use bfill or ffill to fill the missing values. If users want to process it in their own way, they should define their own functions. We will consider to use one function and return the data.

Yes, but couldn't you handle that in the individual processor either directly in download_data or adding DP.clean_data() like this:

DP = DataProcessor(data_source, **kwargs)
# Get candle data (added to the dataframe)
DP.download_data(ticker_list, start_date, end_date, time_interval)
DP.clean_data()
# Add indicators
DP.add_technical_indicator(technical_indicator_list)
# Very flexibel as I construct the df step by step. For example now we skip turbulance. Just using candle data and indicators. Then in the end we get the df.
df = DP.get_df()

I think the example I posted would be the most "pythonic" way to do it, and deep learning best practice too. Making everything more flexible and structured. Currently all the different data processors have differences regarding function names, varying column names (for example 'date' vs 'time') and usage. For example some use .run() to return the final data and alpaca fetch_latest_data().

I understand though that this requires many changes and you aren't excited about that. I believe though it would be well invested, as it also would make many future implementations much easier. Also it would make things a lot more consistent and easier to maintain. I could help with it if you are interested.

zhumingpassional commented 2 years ago

The dataframe or the future numpy inside the class is good. We will update it in the next days.

Currently all the different data processors have differences regarding function names, varying column names (for example 'date' vs 'time') and usage. For example some use .run() to return the final data and alpaca fetch_latest_data(). response: In the new version, we have revised it to ensure all the column names are the same. The function fetch_latest_data should be deleted since the download_data can replace it.

cryptocoinserver commented 2 years ago

Awesome. BTW regarding the decision between the dataframe vs numpy. Using a dataframe as long as possible ensures compatiblity to other libraries. An example would be tsfresh which is an incredible library to generate more features and help with feature selection / filtering. Using the dataframe approach like this and converting to numpy later in env._get_price_array or the agent, would make using it very easy:

DP = DataProcessor(data_source, **kwargs)
# Get candle data (added to the dataframe)
DP.download_data(ticker_list, start_date, end_date, time_interval)
# Add indicators
DP.add_technical_indicator(technical_indicator_list)
# Add features
DP.add_upper_shadow_feature()
DP.add_lower_shadow_feature()
DP.add_parkinson_volatility_feature()
# Very flexibel as I construct the df step by step. For example now we skip turbulance. Just using candle data and indicators. Then in the end we get the df.
df = DP.get_df()
from tsfresh.utilities.dataframe_functions import roll_time_series
df_rolled = roll_time_series(df, column_id="tic", column_sort="time")
from tsfresh import extract_features
df_features = extract_features(df_rolled, column_id="tic", column_sort="time")

Thank you for your great work.

zhumingpassional commented 2 years ago

It's a pleasure to get your suggestions.

We will use several incredible libraries such as tsfresh to calculate more features.

Converting to numpy can be put to the final step based on the generated dataframe, which is easy. This option can be set by users in config

We will update the code in several days.