AI4Finance-Foundation / FinRL

FinRL: Financial Reinforcement Learning. 🔥
https://ai4finance.org
MIT License
9.71k stars 2.36k forks source link

RayTune example throws TypeError: 'int' object is not callable #475

Closed orthosku closed 2 years ago

orthosku commented 2 years ago

Pretty new to python so I'm unable to determine where the issue is - this is from the Raytune Notebook in the repo.

Having an issue with this function:

def DRL_prediction(model_name,test_env_config,env,model_config,agent_path,env_name_test='StockTrading_test_env'):
    env_instance = env(test_env_config)

    register_env(env_name_test, lambda config: env(test_env_config))
    model_config['env'] = env_name_test
    # ray.init() # Other Ray APIs will not work until `ray.init()` is called.
    if model_name == "ppo":
        trainer = MODELS[model_name].PPOTrainer(config=model_config)
    elif model_name == "a2c":
        trainer = MODELS[model_name].A2CTrainer(config=model_config)
    elif model_name == "ddpg":
        trainer = MODELS[model_name].DDPGTrainer(config=model_config)
    elif model_name == "td3":
        trainer = MODELS[model_name].TD3Trainer(config=model_config)
    elif model_name == "sac":
        trainer = MODELS[model_name].SACTrainer(config=model_config)

    try:
        trainer.restore(agent_path)
        print("Restoring from checkpoint path", agent_path)
    except BaseException:
        raise ValueError("Fail to load agent!")

    # test on the testing env
    state = env_instance.reset()
    episode_returns = list()  # the cumulative_return / initial_account
    episode_total_assets = list()
    episode_total_assets.append(env_instance.initial_total_asset)
    done = False
    while not done:
        action = trainer.compute_single_action(state)
        state, reward, done, _ = env_instance.step(action)

        total_asset = (env_instance.amount + (env_instance.price_ary[env_instance.day] * env_instance.stocks).sum())
        episode_total_assets.append(total_asset)
        episode_return = total_asset / env_instance.initial_total_asset
        episode_returns.append(episode_return)
    ray.shutdown()
    print("episode return: " + str(episode_return))
    print("Test Finished!")
    return episode_total_assets

episode_total_assets = DRL_prediction(model_name, test_env_config,env,model_config= best_config,agent_path=agent_path, env_name_test='StockTrading_test_env')

the error stack:

Traceback (most recent call last):
  File "<ipython-input-87-63003522c763>", line 31, in DRL_prediction
    action = trainer.compute_single_action(state)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pivo/lib/python3.8/site-packages/ray/rllib/agents/trainer.py", line 1455, in compute_single_action
    action, state, extra = policy.compute_single_action(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pivo/lib/python3.8/site-packages/ray/rllib/policy/policy.py", line 217, in compute_single_action
    out = self.compute_actions_from_input_dict(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pivo/lib/python3.8/site-packages/ray/rllib/policy/torch_policy.py", line 294, in compute_actions_from_input_dict
    return self._compute_action_helper(input_dict, state_batches,
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pivo/lib/python3.8/site-packages/ray/rllib/utils/threading.py", line 21, in wrapper
    return func(self, *a, **k)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pivo/lib/python3.8/site-packages/ray/rllib/policy/torch_policy.py", line 948, in _compute_action_helper
    self.exploration.get_exploration_action(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pivo/lib/python3.8/site-packages/ray/rllib/utils/exploration/gaussian_noise.py", line 98, in get_exploration_action
    return self._get_torch_exploration_action(action_distribution,
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pivo/lib/python3.8/site-packages/ray/rllib/utils/exploration/ornstein_uhlenbeck_noise.py", line 174, in _get_torch_exploration_action
    mean=torch.zeros(self.ou_state.size()), std=1.0) \
TypeError: 'int' object is not callable

Tried to replace this with the similar function in the source code for RLlib but that section also threw the same error.

Athe-kunal commented 2 years ago

It is hard to say what is going wrong from this much information. Are you running the Ray tune code as it is? If not, what changes have you done?

The error as guessed from the last line is your state is an int perhaps, hence when you try to call a function size() for it then it returns this error that int object is not callable. So probably some error with state variable. Check it by printing it

orthosku commented 2 years ago

I've edited the above for conciseness. I am running the Ray tune code as is, no modifications.

When printing state I get the following:

state = env_instance.reset()
print(state)
[244.14062      0.49733922   0.           4.051727     4.1813035
   5.028118     0.           0.           0.           0.
   0.           0.           0.           0.           0.
   0.           0.           0.           2.08375      2.08375
   0.           0.           0.           0.           0.
   0.           2.1045313    2.1045313    0.           0.
   0.           0.           0.           0.           2.568672
   2.568672  ]

state.shape
(36,)

Also, state is an ndarray:

Screen Shot 2022-02-20 at 7 31 54 PM

Could this be related to this issue with the compute_single_action as below?

https://github.com/ray-project/ray/blob/cc3199b814d2c4fe719851a330124ca8b57b6355/rllib/examples/trajectory_view_api.py#L113-L123

I'm not sure that 'action' is the input dict that compute_single_action wants to take.

Athe-kunal commented 2 years ago

Do you have the full-length code somewhere? If possible, please share, and I will go through it. Ray library is tough to debug, so the complete codebase would be helpful

orthosku commented 2 years ago

Sure, thanks for looking at it. Trying to build a ddpg model. Skipping the import steps:


def sample_ddpg_params():
    return {
        "buffer_size": tune.choice([int(1e3), int(1e5), int(1e6)]),
        "lr": tune.loguniform(1e-5, 1),
        "train_batch_size": tune.choice([32, 256, 512]),
        #"net_arch": tune.choice([[64, 64], [256, 256], [400, 300]])
    }

def sample_a2c_params():
    return {
        "lambda": tune.choice([0.1, 0.3, 0.5, 0.7, 0.9, 1.0]),
        "entropy_coeff": tune.loguniform(0.00000001, 0.1),
        "lr": tune.loguniform(1e-5, 1)

    }

def sample_ppo_params():
    return {
        "entropy_coeff": tune.loguniform(0.00000001, 0.1),
        "lr": tune.loguniform(5e-5, 1),
        "sgd_minibatch_size": tune.choice([32, 64, 128, 256, 512]),
        "lambda": tune.choice([0.1, 0.3, 0.5, 0.7, 0.9, 1.0])
    }

MODELS = {"a2c": a2c, "ddpg": ddpg, "td3": td3, "sac": sac, "ppo": ppo}

def get_train_env(start_date, end_date, ticker_list, data_source, time_interval, technical_indicator_list, env, model_name, if_vix=True, **kwargs):
    # fetch data
    DP = DataProcessor(data_source, **kwargs)
    data = DP.download_data(ticker_list, start_date, end_date, time_interval)
    data = DP.clean_data(data)
    data = DP.add_technical_indicator(data, technical_indicator_list)
    if if_vix:
        data = DP.add_vix(data)
    price_array, tech_array, turbulence_array = DP.df_to_array(data, if_vix)
    train_env_config = {'price_array': price_array,
                        'tech_array': tech_array,
                        'turbulence_array': turbulence_array,
                        'if_train': True}

    return train_env_config

def get_val_env(start_date, end_date, ticker_list, data_source, time_interval, technical_indicator_list, env, model_name, if_vix=True, **kwargs):
    # fetch data
    DP = DataProcessor(data_source, **kwargs)
    data = DP.download_data(ticker_list, start_date, end_date, time_interval)
    data = DP.clean_data(data)
    data = DP.add_technical_indicator(data, technical_indicator_list)
    if if_vix:
        data = DP.add_vix(data)
    price_array, tech_array, turbulence_array = DP.df_to_array(data, if_vix)
    train_env_config = {'price_array': price_array,
                        'tech_array': tech_array,
                        'turbulence_array': turbulence_array,
                        'if_train': True}

    return val_env_config

# Function to calculate the sharpe ratio from the list of total_episode_reward
def calculate_sharpe(episode_reward: list):
    perf_data = pd.DataFrame(data=episode_reward, columns=['reward'])
    perf_data['daily_return'] = perf_data['reward'].pct_change(1)
    if perf_data['daily_return'].std() != 0:
        sharpe = (252 ** 0.5) * perf_data['daily_return'].mean() / \
                 perf_data['daily_return'].std()
        return sharpe
    else:
        return 0

def get_test_config(start_date, end_date, ticker_list, data_source, time_interval, technical_indicator_list, env, model_name, if_vix=True, **kwargs):
    DP = DataProcessor(data_source, **kwargs)
    data = DP.download_data(ticker_list, start_date, end_date, time_interval)
    data = DP.clean_data(data)
    data = DP.add_technical_indicator(data, technical_indicator_list)

    if if_vix:
        data = DP.add_vix(data)

    price_array, tech_array, turbulence_array = DP.df_to_array(data, if_vix)
    test_env_config = {'price_array': price_array,
                       'tech_array': tech_array,
                       'turbulence_array': turbulence_array, 'if_train': False}
    return test_env_config

TRAIN_START_DATE = '2014-01-01'
TRAIN_END_DATE = '2019-07-30'

VAL_START_DATE = '2019-08-01'
VAL_END_DATE = '2020-07-30'

TEST_START_DATE = '2020-08-01'
TEST_END_DATE = '2021-10-01'

technical_indicator_list =config.TECHNICAL_INDICATORS_LIST

model_name = 'ddpg'
env = StockTradingEnv_numpy
ticker_list = ['SPY', 'QQQ', 'DIA']
data_source = 'yahoofinance'
time_interval = '1D'

train_env_config = get_train_env(TRAIN_START_DATE, TRAIN_END_DATE, ticker_list, data_source, time_interval, technical_indicator_list, env, model_name)
val_env_config = get_train_env(VAL_START_DATE, VAL_END_DATE, ticker_list, data_source, time_interval, technical_indicator_list, env, model_name)

from ray.tune.registry import register_env

env_name = 'StockTrading_train_env'
register_env(env_name, lambda config: env(train_env_config))

MODEL_TRAINER = {'a2c': A2CTrainer, 'ppo': PPOTrainer, 'ddpg': DDPGTrainer}
if model_name == "ddpg":
    sample_hyperparameters = sample_ddpg_params()
elif model_name == "ppo":
    sample_hyperparameters = sample_ppo_params()
elif model_name == "a2c":
    sample_hyperparameters = sample_a2c_params()

def run_optuna_tune():
    algo = OptunaSearch()
    algo = ConcurrencyLimiter(algo, max_concurrent=5)
    scheduler = AsyncHyperBandScheduler()
    num_samples = 20
    training_iterations = 100

    analysis = tune.run(
        MODEL_TRAINER[model_name],
        metric="episode_reward_mean",  # The metric to optimize for tuning
        mode="max",  # Maximize the metric
        search_alg=algo,  # OptunaSearch method which uses Tree Parzen estimator to sample hyperparameters
        scheduler=scheduler,  # To prune bad trials
        config={**sample_hyperparameters, # passes model hyperparams
                'env': 'StockTrading_train_env',
                'num_workers': 4,
                'num_gpus': 0,
                'framework': 'torch',
                "evaluation_interval": 1,  # Run evaluation on every iteration
                "evaluation_config": {
                "env_config": val_env_config, # The dictionary we built before (only the overriding keys to use in evaluation)
                "explore": False,  # We don't want to explore during evaluation. All actions have to be repeatable.
                }},
        num_samples=num_samples,  # Number of hyperparameters to test out
        stop={'training_iteration': training_iterations},  # Time attribute to validate the results
        verbose=1, local_dir="./tuned_models",  # Saving tensorboard plots
        # resources_per_trial={'gpu':1,'cpu':1},
        max_failures=1,  # Extra Trying for the failed trials
        raise_on_failed_trial=False,  # Don't return error even if you have errored trials
        keep_checkpoints_num=num_samples - 5,
        checkpoint_score_attr='episode_reward_mean',  # Only store keep_checkpoints_num trials based on this score
        checkpoint_freq=training_iterations  # Checkpointing all the trials
    )
    print("Best hyperparameter: ", analysis.best_config)
    return analysis

analysis = run_optuna_tune()

best_config = analysis.get_best_config(metric='episode_reward_mean',mode='max')
analysis.get_best_logdir(metric='episode_reward_mean',mode='max')

agent_path = analysis.best_checkpoint
#agent path should actually be the best checkpoint to find the correct path
print(agent_path)

#create final test set config
test_env_config = get_test_config(TEST_START_DATE, TEST_END_DATE, ticker_list, data_source, time_interval, technical_indicator_list, env, model_name)

def DRL_prediction(model_name,test_env_config,env,model_config,agent_path,env_name_test='StockTrading_test_env'):
    env_instance = env(test_env_config)

    register_env(env_name_test, lambda config: env(test_env_config))
    model_config['env'] = env_name_test
    # ray.init() # Other Ray APIs will not work until `ray.init()` is called.
    if model_name == "ppo":
        trainer = MODELS[model_name].PPOTrainer(config=model_config)
    elif model_name == "a2c":
        trainer = MODELS[model_name].A2CTrainer(config=model_config)
    elif model_name == "ddpg":
        trainer = MODELS[model_name].DDPGTrainer(config=model_config)
    elif model_name == "td3":
        trainer = MODELS[model_name].TD3Trainer(config=model_config)
    elif model_name == "sac":
        trainer = MODELS[model_name].SACTrainer(config=model_config)

    try:
        trainer.restore(agent_path)
        print("Restoring from checkpoint path", agent_path)
    except BaseException:
        raise ValueError("Fail to load agent!")

    # test on the testing env
    state = env_instance.reset()
    episode_returns = list()  # the cumulative_return / initial_account
    episode_total_assets = list()
    episode_total_assets.append(env_instance.initial_total_asset)
    done = False
    while not done:
        action = trainer.compute_single_action(state)
        state, reward, done, _ = env_instance.step(action)

        total_asset = (
                env_instance.amount
                + (env_instance.price_ary[env_instance.day] * env_instance.stocks).sum()
        )
        episode_total_assets.append(total_asset)
        episode_return = total_asset / env_instance.initial_total_asset
        episode_returns.append(episode_return)
    ray.shutdown()
    print("episode return: " + str(episode_return))
    print("Test Finished!")
    return episode_total_assets
Athe-kunal commented 2 years ago

I was able to implement it, please check here