Open RocketRider opened 1 year ago
Maybe the issue is not in the preprocessor it self. Instead in the Policy serialize -> gym_space_to_dict method. Here the spaces in the dict are iterated over without sorting them.
I'm not sure if I understand "Should test the "write" method." can you be more specific
I am sorry for the inconvenience, Ignore my comment about the "write" method. It is already tested. The issue is somewhere else.
Here is the code to reproduce the issue:
"""
Example of a custom gym environment and model. Run this for a demo.
This example shows:
- using a custom environment
- using a custom model
- using Tune for grid search to try different learning rates
You can visualize experiment results in ~/ray_results using TensorBoard.
Run example with defaults:
$ python custom_env.py
For CLI options:
$ python custom_env.py --help
"""
import argparse
import os
import random
import gymnasium as gym
import numpy as np
import ray
from gymnasium.spaces import Discrete, Box
from ray import air, tune
from ray.rllib.algorithms.ppo import PPOTF1Policy
from ray.rllib.env.env_context import EnvContext
from ray.rllib.env.multi_agent_env import make_multi_agent
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.policy.policy import PolicySpec
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.tune.logger import pretty_print
from ray.tune.registry import get_trainable_cls
tf1, tf, tfv = try_import_tf()
torch, nn = try_import_torch()
parser = argparse.ArgumentParser()
parser.add_argument(
"--run", type=str, default="PPO", help="The RLlib-registered algorithm to use."
)
parser.add_argument(
"--framework",
choices=["tf", "tf2", "torch"],
default="tf",
help="The DL framework specifier.",
)
parser.add_argument(
"--as-test",
action="store_true",
help="Whether this script should be run as a test: --stop-reward must "
"be achieved within --stop-timesteps AND --stop-iters.",
)
parser.add_argument(
"--stop-iters", type=int, default=50, help="Number of iterations to train."
)
parser.add_argument(
"--stop-timesteps", type=int, default=100000, help="Number of timesteps to train."
)
parser.add_argument(
"--stop-reward", type=float, default=0.1, help="Reward at which we stop training."
)
parser.add_argument(
"--no-tune",
action="store_true",
help="Run without Tune using a manual train loop instead. In this case,"
"use PPO without grid search and no TensorBoard.",
)
parser.add_argument(
"--local-mode",
action="store_true",
help="Init Ray in local mode for easier debugging.",
)
class SimpleCorridor(gym.Env):
"""Example of a custom env in which you have to walk down a corridor.
You can configure the length of the corridor via the env config."""
@staticmethod
def get_obs(config: EnvContext):
space = gym.spaces.Dict()
# Causes Issue:
space["b"] = Box(0.0, config["corridor_length"], shape=(10,), dtype=np.float32)
space["a"] = Box(0.0, config["corridor_length"], shape=(1,), dtype=np.float32)
# Works:
# space["a"] = Box(0.0, config["corridor_length"], shape=(1,), dtype=np.float32)
# space["b"] = Box(0.0, config["corridor_length"], shape=(10,), dtype=np.float32)
return space
@staticmethod
def get_action_space():
return Discrete(2)
def __init__(self, config: EnvContext):
self.end_pos = config["corridor_length"]
self.cur_pos = 0
self.action_space = SimpleCorridor.get_action_space()
self.observation_space = SimpleCorridor.get_obs(config)
# Set the seed. This is only used for the final (reach goal) reward.
self.reset(seed=config.worker_index * config.num_workers)
def reset(self, *, seed=None, options=None):
random.seed(seed)
self.cur_pos = 0
return {"a": np.array([self.cur_pos], dtype=np.float32), "b": np.zeros((10,), dtype=np.float32)}, {}
def step(self, action):
assert action in [0, 1], action
if action == 0 and self.cur_pos > 0:
self.cur_pos -= 1
elif action == 1:
self.cur_pos += 1
done = truncated = self.cur_pos >= self.end_pos
# Produce a random reward when we reach the goal.
return (
{"a": np.array([self.cur_pos], dtype=np.float32), "b": np.zeros((10,), dtype=np.float32)},
random.random() * 2 if done else -0.1,
done,
truncated,
{},
)
class CustomModel(TFModelV2):
"""Example of a keras custom model that just delegates to an fc-net."""
def __init__(self, obs_space, action_space, num_outputs, model_config, name):
super(CustomModel, self).__init__(
obs_space, action_space, num_outputs, model_config, name
)
self.model = FullyConnectedNetwork(
obs_space, action_space, num_outputs, model_config, name
)
def forward(self, input_dict, state, seq_lens):
return self.model.forward(input_dict, state, seq_lens)
def value_function(self):
return self.model.value_function()
def policy_mapping_fn(agent_id, episode, worker, **kwargs):
return "0"
if __name__ == "__main__":
args = parser.parse_args()
print(f"Running with following CLI options: {args}")
ray.init(local_mode=args.local_mode)
# Can also register the env creator function explicitly with:
# register_env("corridor", lambda config: SimpleCorridor(config))
ModelCatalog.register_custom_model(
"my_model", CustomModel
)
env_config = {"corridor_length": 5}
config = (
get_trainable_cls(args.run)
.get_default_config()
# or "corridor" if registered above
.environment(make_multi_agent(SimpleCorridor), env_config=env_config)
.framework(args.framework)
.rollouts(num_rollout_workers=1)
.multi_agent(policies={
"0": PolicySpec(PPOTF1Policy, observation_space=SimpleCorridor.get_obs(env_config),
action_space=SimpleCorridor.get_action_space(),
config={"model": {
"custom_model": "my_model",
"vf_share_layers": True,
}})}, policy_mapping_fn=policy_mapping_fn)
.training(
)
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
.resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0")))
)
stop = {
"training_iteration": args.stop_iters,
"timesteps_total": args.stop_timesteps,
"episode_reward_mean": args.stop_reward,
}
if args.no_tune:
# manual training with train loop using PPO and fixed learning rate
if args.run != "PPO":
raise ValueError("Only support --run PPO with --no-tune.")
print("Running manual train loop without Ray Tune.")
# use fixed learning rate instead of grid search (needs tune)
config.lr = 1e-3
algo = config.build()
# run manual training loop and print results after each iteration
for _ in range(args.stop_iters):
result = algo.train()
print(pretty_print(result))
# stop training of the target train steps or reward are reached
if (
result["timesteps_total"] >= args.stop_timesteps
or result["episode_reward_mean"] >= args.stop_reward
):
break
algo.stop()
else:
# automated run with Tune and grid search and TensorBoard
print("Training automatically with Ray Tune")
tuner = tune.Tuner(
args.run,
param_space=config.to_dict(),
run_config=air.RunConfig(stop=stop),
)
results = tuner.fit()
if args.as_test:
print("Checking if learning goals were achieved")
check_learning_achieved(results, args.stop_reward)
ray.shutdown()
You can change the code in "get_obs" to change the order of the OBS. The ordering should not impact the result, but it leads to the following crash:
(PPO pid=26112) 2023-03-17 07:22:58,445 WARNING util.py:67 -- Install gputil for GPU system monitoring.
(PPO pid=26112) 2023-03-17 07:22:58,467 ERROR actor_manager.py:496 -- Ray error, taking actor 1 out of service. ray::RolloutWorker.apply() (pid=5636, ip=127.0.0.1, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x0000022476DB3550>)
(PPO pid=26112) File "python\ray\_raylet.pyx", line 857, in ray._raylet.execute_task
(PPO pid=26112) File "python\ray\_raylet.pyx", line 861, in ray._raylet.execute_task
(PPO pid=26112) File "python\ray\_raylet.pyx", line 803, in ray._raylet.execute_task.function_executor
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\_private\function_manager.py", line 674, in actor_method_executor
(PPO pid=26112) return method(__ray_actor, *args, **kwargs)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\util\tracing\tracing_helper.py", line 466, in _resume_span
(PPO pid=26112) return method(self, *_args, **_kwargs)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\utils\actor_manager.py", line 183, in apply
(PPO pid=26112) raise e
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\utils\actor_manager.py", line 174, in apply
(PPO pid=26112) return func(self, *args, **kwargs)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\execution\rollout_ops.py", line 86, in <lambda>
(PPO pid=26112) lambda w: w.sample(), local_worker=False, healthy_only=True
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\util\tracing\tracing_helper.py", line 466, in _resume_span
(PPO pid=26112) return method(self, *_args, **_kwargs)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 914, in sample
(PPO pid=26112) batches = [self.input_reader.next()]
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\evaluation\sampler.py", line 92, in next
(PPO pid=26112) batches = [self.get_data()]
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\evaluation\sampler.py", line 277, in get_data
(PPO pid=26112) item = next(self._env_runner)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\evaluation\env_runner_v2.py", line 323, in run
(PPO pid=26112) outputs = self.step()
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\evaluation\env_runner_v2.py", line 349, in step
(PPO pid=26112) active_envs, to_eval, outputs = self._process_observations(
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\evaluation\env_runner_v2.py", line 616, in _process_observations
(PPO pid=26112) processed = policy.agent_connectors(acd_list)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\connectors\agent\pipeline.py", line 41, in __call__
(PPO pid=26112) ret = c(ret)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\connectors\connector.py", line 254, in __call__
(PPO pid=26112) return [self.transform(d) for d in acd_list]
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\connectors\connector.py", line 254, in <listcomp>
(PPO pid=26112) return [self.transform(d) for d in acd_list]
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\connectors\agent\obs_preproc.py", line 58, in transform
(PPO pid=26112) d[SampleBatch.NEXT_OBS] = self._preprocessor.transform(
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\models\preprocessors.py", line 287, in transform
(PPO pid=26112) self.write(observation, array, 0)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\models\preprocessors.py", line 299, in write
(PPO pid=26112) p.write(o, array, offset)
(PPO pid=26112) File "C:\Users\test\AppData\Local\miniconda3\envs\project_env\lib\site-packages\ray\rllib\models\preprocessors.py", line 211, in write
(PPO pid=26112) array[offset : offset + self._size] = np.array(observation, copy=False).ravel()
(PPO pid=26112) ValueError: could not broadcast input array from shape (10,) into shape (1,)
2023-03-17 07:22:59,175 ERROR tune.py:794 -- Trials did not complete: [PPO_MultiEnv_1f1bd_00000]
2023-03-17 07:22:59,176 INFO tune.py:798 -- Total run time: 17.70 seconds (17.00 seconds for the tuning loop).
Process finished with exit code 0
@avnishn could you look into this issue?
I added the repo-script, please take a look.
I am still seeing this issue in Ray 2.7.1: https://github.com/ray-project/ray/blob/master/rllib/models/preprocessors.py#L330
What happened + What you expected to happen
When using a dict action space with different Spaces inside the ordering of the OBS in DictFlatteningPreprocessor is taken into account. So at the moment I need to pass the observation space definition in the exact same order as the real OBS later on. And the "write" method is ordering the real OBS alphabetically if it is not a OrderedDict. So as my workaround I need to pass OrderedDict in exact same order as the definition. Or the definition must be done in alphabetical order.
Proposed solution:
Versions / Dependencies
Ray 2.3.0
Reproduction script
See comment below
Issue Severity
Medium: It is a significant difficulty but I can work around it.