ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.7k stars 5.73k forks source link

Assertion Error on Seq lens for PPO with Attention only in evaluation. #22266

Open avacaondata opened 2 years ago

avacaondata commented 2 years ago

Search before asking

Ray Component

RLlib

What happened + What you expected to happen

I trained a PPO model with attention, the full config is this:

{
            "env": "SimpleCryptoEnv",  # "CartPole-v0", #
            # "env_config": config_train,  # The dictionary we built before
            "log_level": "WARNING",
            "framework": "torch",
            "_fake_gpus": False,
            "callbacks": MyCallback,
            "ignore_worker_failures": True,
            "num_workers": 12,  # One worker per agent. You can increase this but it will run fewer parallel trainings.
            "num_envs_per_worker": 1,
            "num_gpus": 1,  # I yet have to understand if using a GPU is worth it, for our purposes, but I think it's not. This way you can train on a non-gpu enabled system.
            "clip_rewards": True,
            # "lr": 1e-4,  # Hyperparameter grid search defined above
            # "gamma": 0.99,  # This can have a big impact on the result and needs to be properly tuned (range is 0 to 1)
            # "lambda": 1.0,
            "observation_filter": "MeanStdFilter",
            "model": {
                "fcnet_hiddens": [256, 256],  # Hyperparameter grid search defined above
                "use_attention": True,
                "attention_use_n_prev_actions": 64,
                "attention_use_n_prev_rewards": 64,
                "vf_share_layers": True,
            },
            #"num_sgd_iter": 10,  # tune.choice([10, 20, 30]),
            "sgd_minibatch_size": 1024, # 128  # tune.choice([128, 512, 2048]),
            "train_batch_size": 32768, # , # 1024 # tune.choice([10000, 20000, 40000]),
            "evaluation_interval": 1,  # Run evaluation on every iteration
            "vf_clip_param": 300000, 
            "evaluation_config": {
                "env_config": config_eval,  # The dictionary we built before (only the overriding keys to use in evaluation)
                "explore": False,  # We don't want to explore during evaluation. All actions have to be repeatable.
            },
        }

It trains properly, but when I try to use it for evaluation, everything crashes:

agent.compute_single_action(input_dict={"obs":obs, "state":[0]*64, "prev_action": 0, "prev_reward": 0})

I've tried only with observations (I think that's the proper way, it makes no sense that the developer has to discover how the agent wants the previous - inexistent- states; if this is dealt with automatically in training, there should be no problem for dealing with this in evaluation), but it doesn't work that way either.

This is the full trace of the error.

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_348/535668899.py in <module>
----> 1 agent.compute_single_action(input_dict={"obs":obs, "state":[0]*64, "prev_action": 0, "prev_reward": 0})

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\agents\trainer.py in compute_single_action(self, observation, state, prev_action, prev_reward, info, input_dict, policy_id, full_fetch, explore, timestep, episode, unsquash_action, clip_action, unsquash_actions, clip_actions, **kwargs)
   1483         if input_dict is not None:
   1484             input_dict[SampleBatch.OBS] = observation
-> 1485             action, state, extra = policy.compute_single_action(
   1486                 input_dict=input_dict,
   1487                 explore=explore,

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\policy\policy.py in compute_single_action(self, obs, state, prev_action, prev_reward, info, input_dict, episode, explore, timestep, **kwargs)
    216             episodes = [episode]
    217 
--> 218         out = self.compute_actions_from_input_dict(
    219             input_dict=SampleBatch(input_dict),
    220             episodes=episodes,

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\policy\torch_policy.py in compute_actions_from_input_dict(self, input_dict, explore, timestep, **kwargs)
    292                 if state_batches else None
    293 
--> 294             return self._compute_action_helper(input_dict, state_batches,
    295                                                seq_lens, explore, timestep)
    296 

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\utils\threading.py in wrapper(self, *a, **k)
     19         try:
     20             with self._lock:
---> 21                 return func(self, *a, **k)
     22         except AttributeError as e:
     23             if "has no attribute '_lock'" in e.args[0]:

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\policy\torch_policy.py in _compute_action_helper(self, input_dict, state_batches, seq_lens, explore, timestep)
    932             else:
    933                 dist_class = self.dist_class
--> 934                 dist_inputs, state_out = self.model(input_dict, state_batches,
    935                                                     seq_lens)
    936 

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\models\modelv2.py in __call__(self, input_dict, state, seq_lens)
    241 
    242         with self.context():
--> 243             res = self.forward(restored, state or [], seq_lens)
    244 
    245         if isinstance(input_dict, SampleBatch):

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\models\torch\attention_net.py in forward(self, input_dict, state, seq_lens)
    345                 state: List[TensorType],
    346                 seq_lens: TensorType) -> (TensorType, List[TensorType]):
--> 347         assert seq_lens is not None
    348         # Push obs through "unwrapped" net's `forward()` first.
    349         wrapped_out, _ = self._wrapped_forward(input_dict, [], None)

AssertionError: 

Versions / Dependencies

ray '2.0.0.dev0' python 3.8 Windows 10

Reproduction script

import ray
from ray.rllib.agents import ppo
from ray.tune.registry import register_env
from ray.rllib.agents.ppo import DEFAULT_CONFIG
import gym
config = DEFAULT_CONFIG.copy()
config.update(
    {
            "env":  "CartPole-v0", #
            # "env_config": config_train,  # The dictionary we built before
            "log_level": "WARNING",
            "framework": "torch",
            "_fake_gpus": False,
            "callbacks": MyCallback,
            "ignore_worker_failures": True,
            "num_workers": 12,  # One worker per agent. You can increase this but it will run fewer parallel trainings.
            "num_envs_per_worker": 1,
            "num_gpus": 1,  # I yet have to understand if using a GPU is worth it, for our purposes, but I think it's not. This way you can train on a non-gpu enabled system.
            "clip_rewards": True,
            # "lr": 1e-4,  # Hyperparameter grid search defined above
            # "gamma": 0.99,  # This can have a big impact on the result and needs to be properly tuned (range is 0 to 1)
            # "lambda": 1.0,
            "observation_filter": "MeanStdFilter",
            "model": {
                "fcnet_hiddens": [256, 256],  # Hyperparameter grid search defined above
                "use_attention": True,
                "attention_use_n_prev_actions": 64,
                "attention_use_n_prev_rewards": 64,
                "vf_share_layers": True,
            },
            #"num_sgd_iter": 10,  # tune.choice([10, 20, 30]),
            "sgd_minibatch_size": 1024, # 128  # tune.choice([128, 512, 2048]),
            "train_batch_size": 32768, # , # 1024 # tune.choice([10000, 20000, 40000]),
            "evaluation_interval": 1,  # Run evaluation on every iteration
            "vf_clip_param": 300000, 
        }
)

ray.init(num_gpus=1)
agent = ppo.PPOTrainer(config=config, env="CartPole-v0")
env = gym.make("CartPole-v0")

episode_reward = 0
done = False
obs = env.reset()
agent.compute_single_action(input_dict={"obs":obs, "state":[0]*64, "prev_action": 0, "prev_reward": 0})
# or
agent.compute_single_action(obs)

Anything else

@deanwampler @ericl @richardliaw

Are you willing to submit a PR?

amogkam commented 2 years ago

cc @avnishn

Kylecrif commented 2 years ago

Getting this error as well for "use_lstm": True on ray 1.11.0. I've verified that using the default fully-connected model works fine

gjoliver commented 2 years ago

understand the pain. thanks for the report! we are introducing changes to RLlib 2.0 that will make inference/evaluation, pretty much this exact use case, much smoother.

mvccn commented 2 years ago

meanwhile, as we wait for 2.0, what should we do now? should pass the previous observations to compute_single_action? a code example for this would be very helpful.

gjoliver commented 2 years ago

the output of compute_single_action() is a 3-tuple, action, state, and fetch_dict. essentially, I think you want to save the current state output, and pass it in with the next compute_single_action() call. you can use trainer.get_policy().get_initial_state() to get the first input state for your first compute_single_action() call.

malintha commented 2 years ago

For the LSTM, you have to pass the state parameter with full_fetch=True on rllib 2.0.

state=[np.zeros(params['model_config']['lstm_cell_size'], np.float32),
           np.zeros(params['model_config']['lstm_cell_size'], np.float32)] 
action = algo_agent.compute_single_action(observation=obs[agt], policy_id=policy_id, explore=False, full_fetch=True) 

This works fine for me for a LSTM module, but what's troubling is getting an attention module working. I will update if I can find a solution to that.

brendk commented 1 year ago

Hi, this works for me with use_attention=True

env = env_creator(test_env_config)
episode_reward = 0
prev_obs_list = []
prev_actions_list = []
prev_rewards_list = []
prev_states_list = []
obs = env.reset()

state = np.zeros(
    config.to_dict()["model"]["attention_dim"]
)
prev_obs_list = [
    obs for _ in range(
        config.to_dict()["model"]["attention_use_n_prev_actions"]
    )
]
prev_actions_list = [
    [0] * env.action_space.shape[0] for _ in range(
        config.to_dict()["model"]["attention_use_n_prev_actions"]
    )
]
prev_rewards_list = [
    0 for _ in range(
        config.to_dict()["model"]["attention_use_n_prev_rewards"]
    )
]
prev_states_list = [
    state for _ in range(
        config.to_dict()["model"]["attention_memory_inference"]
    )
]

while True: 
    input_dict = {
        "obs": obs,
        "prev_n_obs": np.stack(
            prev_obs_list[-config.to_dict()["model"]["attention_use_n_prev_actions"]:]
        ),
        "prev_actions": np.stack(
            prev_actions_list[-config.to_dict()["model"]["attention_use_n_prev_actions"]:]
        ),
        "prev_rewards": np.stack(
            prev_rewards_list[-config.to_dict()["model"]["attention_use_n_prev_rewards"]:]
        ),
        "state_in": np.stack(
            prev_states_list[-config.to_dict()["model"]["attention_memory_inference"]:]
        )
    }
    action, state, _ = algo.compute_single_action(
        input_dict=input_dict, full_fetch=True
    )
    obs, reward, done, info = env.step(action)
    episode_reward += reward
    prev_obs_list.append(obs)
    prev_actions_list.append(action)
    prev_rewards_list.append(reward)
    prev_states_list.append(state[0])
    if done:
        print("info", info)
        break
lyzyn commented 9 months ago

对于 LSTM,您必须在 rllib 2.0 上传递 state 参数。full_fetch=True

state=[np.zeros(params['model_config']['lstm_cell_size'], np.float32),
           np.zeros(params['model_config']['lstm_cell_size'], np.float32)] 
action = algo_agent.compute_single_action(observation=obs[agt], policy_id=policy_id, explore=False, full_fetch=True) 

对于LSTM模块来说,这对我来说很好用,但令人不安的是注意力模块的工作。如果我能找到解决方案,我会更新。

Hello, I also encountered an LSTM error during predict. May I ask if the solution you provided is to modify it in the predict file? I am a beginner and the questions I ask may be a bit basic. Here is my prediction file, and I don't know where to modify it. Thank you! Looking forward to your reply!


user_path = os.path.expanduser("~")                                                
checkpoint_subpath = os.path.join(user_path, r"ray_result\PPO", checkpoint_str)     
entries = os.scandir(checkpoint_subpath)                                             
latest_dir = max((e for e in entries if e.is_dir()), key=lambda e: e.stat().st_mtime)     
checkpoint_subidx = re.sub(r"_0+", "-", latest_dir.name)                           
# checkpoint_path = os.path.join(user_path, r"ray_results\PPO", checkpoint_str)         
checkpoint_path = os.path.join(user_path, r"ray_result\PPO")        
config_path = os.path.join(checkpoint_path, "params.pkl")

args = vars(parser.parse_args(args=[]))
args['checkpoint'] = checkpoint_path

analysis = ExperimentAnalysis(checkpoint_path)
trial = analysis.trials[0]
best_checkpoint = analysis.get_best_checkpoint(trial, metric="episode_reward_mean", mode="max")
best_checkpoint_path_str = best_checkpoint.to_directory()
best_checkpoint_dir = os.path.dirname(best_checkpoint_path_str)
print(best_checkpoint_path_str)

config_from_file = trial.config
config = config_creator(**args)
config['framework'] = config_from_file['framework']
config['lr'] = config_from_file['lr']
config['train_batch_size'] = config_from_file['train_batch_size']
env_name = args["env_name"]
register_env(env_name, env_creator)
env_config = config["env_config"]
env = env_creator(env_config)
obs_space = env.observation_space
act_space = env.action_space
print(config)

predictor = Algorithm.from_checkpoint(best_checkpoint)

obs, _ = env.reset()
obs_0 = obs.copy()

num_steps = 96         
num_samples = 50       

calc_time_tot = np.zeros((num_steps, num_samples))    
total_reward_tot = np.zeros(num_steps)                 
raw_action_tot = {}                                   
action_power_tot = {}                                   
best_iter_idx_tot = {}
total_reward_steps = {}
bus_voltage_tot = {}                 
line_trans_power_tot = {}            
step_cost_tot = {}                  
shapley_value_tot = {}               
Excess_return_tot = {}               
Prosumer_power_tot = {}            
HVAC_zone_temp_tot = {}              
HVAC_key_word = 'HVAC'
for i_step in range(num_steps-1):

    print("\033[36;1mcurrent step: {}, current scheduling time: {}\033[0m".format(env.episode_step, env.time))

    total_reward_step = np.zeros(num_samples)
    action_step = {}
    raw_action_step = {}
    real_power_step = {}
    bus_voltage_tot[i_step] = {}
    bus_voltage_step = {}
    line_trans_power_tot[i_step] = {}
    line_trans_power_step = {}
    step_cost_tot[i_step] = {}
    step_cost_step = {}
    shapley_value_tot[i_step] = {}
    shapley_value_step = {}
    Excess_return_tot[i_step] = {}
    Excess_return_step = {}
    Prosumer_power_tot[i_step] = {}
    Prosumer_power_step = {}
    HVAC_zone_temp_step = {}

    for iter_pred in tqdm(range(num_samples), desc='num_samples:'):
        obs = obs_0.copy()
        # obs = copy.deepcopy(obs_0)
        action_step[iter_pred] = {}
        raw_action_step[iter_pred] = {}
        real_power_step[iter_pred] = {}
        bus_voltage_step[iter_pred] = {}
        line_trans_power_step[iter_pred] = {}
        step_cost_step[iter_pred] = {}
        shapley_value_step[iter_pred] = {}
        Excess_return_step[iter_pred] = {}
        Prosumer_power_step[iter_pred] = {}
        HVAC_zone_temp_step[iter_pred] = {}

        for agent_id, agent_obs in obs.items():
            policy_id = config['multiagent']['policy_mapping_fn'](agent_id, episode=None, worker=None)
            start = time.time()

            action_step[iter_pred][agent_id] = predictor.compute_single_action(agent_obs, policy_id=policy_id)

            end = time.time()                                    
            calc_time_tot[i_step, iter_pred] += end - start

            raw_action_step[iter_pred][agent_id] = {}

            real_power_step[iter_pred][agent_id] = {}

            HVAC_zone_temp_step[iter_pred][agent_id] = {}
            for e in env.agent_dict[agent_id].env_dict:    # e: str -> PVone
                _env = env.agent_dict[agent_id].env_dict[e]
                raw_action_step[iter_pred][agent_id][e] = _env.calc_raw_action(action_step[iter_pred][agent_id][e])     
                real_power_step[iter_pred][agent_id][e] = _env.calc_real_power(action_step[iter_pred][agent_id][e])
                if HVAC_key_word in e:
                    HVAC_zone_temp_step[iter_pred][agent_id][e] = _env.calc_zone_temp(action_step[iter_pred][agent_id][e])

            end = time.time()
            calc_time_tot[i_step, iter_pred] += end - start

        _, reward, ter, tru, _, bus_voltage_step[iter_pred], line_trans_power_step[iter_pred], step_cost_step[iter_pred], shapley_value_step[iter_pred], Excess_return_step[iter_pred], Prosumer_power_step[iter_pred] = env.step(action_step[iter_pred], calc_rew=True)
        # done = done['__all__']
        ter = ter['__all__']
        tru = tru['__all__']

        total_reward_step[iter_pred] = sum(reward.values())
lyzyn commented 9 months ago

先搜索后询问

  • [x] 我搜索了这些问题,没有发现类似的问题。

射线组件

RLlib 的

发生了什么 + 你期望会发生什么

我用心训练了一个PPO模型,完整的配置是这样的:

{
            "env": "SimpleCryptoEnv",  # "CartPole-v0", #
            # "env_config": config_train,  # The dictionary we built before
            "log_level": "WARNING",
            "framework": "torch",
            "_fake_gpus": False,
            "callbacks": MyCallback,
            "ignore_worker_failures": True,
            "num_workers": 12,  # One worker per agent. You can increase this but it will run fewer parallel trainings.
            "num_envs_per_worker": 1,
            "num_gpus": 1,  # I yet have to understand if using a GPU is worth it, for our purposes, but I think it's not. This way you can train on a non-gpu enabled system.
            "clip_rewards": True,
            # "lr": 1e-4,  # Hyperparameter grid search defined above
            # "gamma": 0.99,  # This can have a big impact on the result and needs to be properly tuned (range is 0 to 1)
            # "lambda": 1.0,
            "observation_filter": "MeanStdFilter",
            "model": {
                "fcnet_hiddens": [256, 256],  # Hyperparameter grid search defined above
                "use_attention": True,
                "attention_use_n_prev_actions": 64,
                "attention_use_n_prev_rewards": 64,
                "vf_share_layers": True,
            },
            #"num_sgd_iter": 10,  # tune.choice([10, 20, 30]),
            "sgd_minibatch_size": 1024, # 128  # tune.choice([128, 512, 2048]),
            "train_batch_size": 32768, # , # 1024 # tune.choice([10000, 20000, 40000]),
            "evaluation_interval": 1,  # Run evaluation on every iteration
            "vf_clip_param": 300000, 
            "evaluation_config": {
                "env_config": config_eval,  # The dictionary we built before (only the overriding keys to use in evaluation)
                "explore": False,  # We don't want to explore during evaluation. All actions have to be repeatable.
            },
        }

它训练正常,但是当我尝试使用它进行评估时,一切都崩溃了:

agent.compute_single_action(input_dict={"obs":obs, "state":[0]*64, "prev_action": 0, "prev_reward": 0})

我只尝试过观察(我认为这是正确的方法,开发人员必须发现代理如何想要以前的 - 不存在 - 状态是没有意义的;如果这在训练中自动处理,那么在评估中处理这个问题应该没有问题),但它也不是这样工作的。

这是错误的完整跟踪。

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_348/535668899.py in <module>
----> 1 agent.compute_single_action(input_dict={"obs":obs, "state":[0]*64, "prev_action": 0, "prev_reward": 0})

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\agents\trainer.py in compute_single_action(self, observation, state, prev_action, prev_reward, info, input_dict, policy_id, full_fetch, explore, timestep, episode, unsquash_action, clip_action, unsquash_actions, clip_actions, **kwargs)
   1483         if input_dict is not None:
   1484             input_dict[SampleBatch.OBS] = observation
-> 1485             action, state, extra = policy.compute_single_action(
   1486                 input_dict=input_dict,
   1487                 explore=explore,

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\policy\policy.py in compute_single_action(self, obs, state, prev_action, prev_reward, info, input_dict, episode, explore, timestep, **kwargs)
    216             episodes = [episode]
    217 
--> 218         out = self.compute_actions_from_input_dict(
    219             input_dict=SampleBatch(input_dict),
    220             episodes=episodes,

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\policy\torch_policy.py in compute_actions_from_input_dict(self, input_dict, explore, timestep, **kwargs)
    292                 if state_batches else None
    293 
--> 294             return self._compute_action_helper(input_dict, state_batches,
    295                                                seq_lens, explore, timestep)
    296 

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\utils\threading.py in wrapper(self, *a, **k)
     19         try:
     20             with self._lock:
---> 21                 return func(self, *a, **k)
     22         except AttributeError as e:
     23             if "has no attribute '_lock'" in e.args[0]:

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\policy\torch_policy.py in _compute_action_helper(self, input_dict, state_batches, seq_lens, explore, timestep)
    932             else:
    933                 dist_class = self.dist_class
--> 934                 dist_inputs, state_out = self.model(input_dict, state_batches,
    935                                                     seq_lens)
    936 

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\models\modelv2.py in __call__(self, input_dict, state, seq_lens)
    241 
    242         with self.context():
--> 243             res = self.forward(restored, state or [], seq_lens)
    244 
    245         if isinstance(input_dict, SampleBatch):

~\anaconda3\envs\cryptorl\lib\site-packages\ray\rllib\models\torch\attention_net.py in forward(self, input_dict, state, seq_lens)
    345                 state: List[TensorType],
    346                 seq_lens: TensorType) -> (TensorType, List[TensorType]):
--> 347         assert seq_lens is not None
    348         # Push obs through "unwrapped" net's `forward()` first.
    349         wrapped_out, _ = self._wrapped_forward(input_dict, [], None)

AssertionError: 

版本/依赖关系

ray '2.0.0.dev0' python 3.8 窗户 10

复制脚本

import ray
from ray.rllib.agents import ppo
from ray.tune.registry import register_env
from ray.rllib.agents.ppo import DEFAULT_CONFIG
import gym
config = DEFAULT_CONFIG.copy()
config.update(
    {
            "env":  "CartPole-v0", #
            # "env_config": config_train,  # The dictionary we built before
            "log_level": "WARNING",
            "framework": "torch",
            "_fake_gpus": False,
            "callbacks": MyCallback,
            "ignore_worker_failures": True,
            "num_workers": 12,  # One worker per agent. You can increase this but it will run fewer parallel trainings.
            "num_envs_per_worker": 1,
            "num_gpus": 1,  # I yet have to understand if using a GPU is worth it, for our purposes, but I think it's not. This way you can train on a non-gpu enabled system.
            "clip_rewards": True,
            # "lr": 1e-4,  # Hyperparameter grid search defined above
            # "gamma": 0.99,  # This can have a big impact on the result and needs to be properly tuned (range is 0 to 1)
            # "lambda": 1.0,
            "observation_filter": "MeanStdFilter",
            "model": {
                "fcnet_hiddens": [256, 256],  # Hyperparameter grid search defined above
                "use_attention": True,
                "attention_use_n_prev_actions": 64,
                "attention_use_n_prev_rewards": 64,
                "vf_share_layers": True,
            },
            #"num_sgd_iter": 10,  # tune.choice([10, 20, 30]),
            "sgd_minibatch_size": 1024, # 128  # tune.choice([128, 512, 2048]),
            "train_batch_size": 32768, # , # 1024 # tune.choice([10000, 20000, 40000]),
            "evaluation_interval": 1,  # Run evaluation on every iteration
            "vf_clip_param": 300000, 
        }
)

ray.init(num_gpus=1)
agent = ppo.PPOTrainer(config=config, env="CartPole-v0")
env = gym.make("CartPole-v0")

episode_reward = 0
done = False
obs = env.reset()
agent.compute_single_action(input_dict={"obs":obs, "state":[0]*64, "prev_action": 0, "prev_reward": 0})
# or
agent.compute_single_action(obs)

别的东西

@deanwampler @ericl @richardliaw

你愿意提交PR吗?

  • [ ] 是的,我愿意提交PR!

May I ask if you have resolved it? Thank you!