Closed charleswangyanan closed 7 months ago
SimpleEnv
was to avoid lengthy testing times generated from importing MuJoCo during the code inspection stage in GitHub actions. We utilize this simple yet meaningless environment to examine whether there are any bugs in the algorithmic implementation of our codebase.SimpleEnv
has not been integrated into the officially supported environments of OmniSafe but merely exists as a test case.SimpleEnv
into OmniSafe officially as an example could better facilitate user customization of environment interfaces, which may also resolve your question 2. This will be supported in subsequent versions. Thank you for your answer, When I define my environment according to the examples provided in omnisafe/envs/safety_gymnasium_env as following, but when I run train_policy.py, the environment can't be introduced. AssertionError: Custom0-v0 doesn't exist.
@env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Custom0-v0', 'Custom1-v0', 'Custom2-v0', ] self._env = custom_env.make(env_id=env_id, **kwargs)
Have you imported your custom environment in omnisafe/envs/__init__.py
?
You can try:
from omnisafe.envs.custom_env import CustomEnv
to resolve that issue.
Oh, I have added from omnisafe.envs.custom_env import CustomEnv in omnisafe/envs/init.py. but why env_id missing? Thank you very much again...
I tried writing code in a similar way to your screenshots, but unfortunately, I was unable to reproduce the same error. I will share my steps and source code below, and I hope it can be of help to you:
omnisafe/envs/custenv.py
, with the inner code as
from __future__ import annotations
from typing import Any, ClassVar import torch
from omnisafe.envs.core import CMDP, env_register from omnisafe.typing import DEVICE_CPU
@env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Custom0-v0', 'Custom1-v0', 'Custom2-v0', ]
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: torch.device = DEVICE_CPU,
**kwargs: Any,
) -> None:
super().__init__(env_id)
self._num_envs = num_envs
self._device = torch.device(device)
def reset(
self,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[torch.Tensor, dict[str, Any]]:
pass
def step(self):
pass
def set_seed(self, seed: int) -> None:
pass
def sample_action(self) -> torch.Tensor:
pass
def render(self) -> Any:
pass
def close(self) -> None:
pass
- Import CustomEnv in ``omnisafe/envs/__init__.py``
```python
from omnisafe.envs.core import CMDP, env_register, make, support_envs
from omnisafe.envs.mujoco_env import MujocoEnv
from omnisafe.envs.safety_gymnasium_env import SafetyGymnasiumEnv
from omnisafe.envs.safety_gymnasium_modelbased import SafetyGymnasiumModelBased
from omnisafe.envs.custenv import CustomEnv
Run python train_policy.py --algo PPO --env-id Custom0-v0 in the terminal
python train_policy.py --algo PPO --env-id Custom0-v0
As there is nothing in this empty environment, I exit the code in omnisafe/adapter/online_adapter
as soon as it successfully make the environment:
class OnlineAdapter:
"""Online Adapter for OmniSafe.
OmniSafe is a framework for safe reinforcement learning. It is designed to be compatible with
any existing RL algorithms. The online adapter is used to adapt the environment to the
framework.
Args:
env_id (str): The environment id.
num_envs (int): The number of parallel environments.
seed (int): The random seed.
cfgs (Config): The configuration.
"""
def __init__( # pylint: disable=too-many-arguments
self,
env_id: str,
num_envs: int,
seed: int,
cfgs: Config,
) -> None:
"""Initialize an instance of :class:`OnlineAdapter`."""
assert env_id in support_envs(), f'Env {env_id} is not supported.'
self._cfgs: Config = cfgs
self._device: torch.device = get_device(cfgs.train_cfgs.device)
self._env_id: str = env_id
self._env: CMDP = make(env_id, num_envs=num_envs, device=self._device)
exit()
The code served me well and there is no TypeError
.
Thank you a lot! I found that the key is to run the terminal. Now the terminal run has been successfully introduced into my environment. I used Spyder before, but the error with "did not have env_id". I don’t know if it is related to the windows system I used. I did not use the ubtun system. In addition, the new question is that my observation is not a box. Why? I think my observation is a box, and my observation is 24-hour data and then normalized to 0-1 (one data per hour, which can be understood as 24 numbers, but after normalized, it is continuous). The general environment is as follows:
@env_register
class CustomEnv(CMDP):
_support_envs: ClassVar[list[str]] = [
'Cust0-v0',
'Cust1-v0',
'Cust2-v0',
]
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: torch.device = DEVICE_CPU,
**kwargs: Any,
) -> None:
super().__init__(env_id)
self._num_envs = num_envs
self._device = torch.device(device)
self._observation_space = spaces.Box(low=0, high=1.0, shape=(13,))
self._action_space = spaces.Box(low=0, high=1.0, shape=(4,))
self._coordinate_observation_space = spaces.Box(low=0, high=1.0, shape=(13,))
self.df = pd.read_csv('data.csv')
def normalization(self,data):
_range = np.max(data) - np.min(data)
return (data - np.min(data)) / _range
def get(self):
self.price_all= np.array([self.df.loc[: , 'Price'].values])
self.P_d_all = np.array([self.df.loc[: , 'Demand'].values])
self.price_all=self.normalization(self.price_all)
self.P_d_all=self.normalization(self.P_d_all)
def _next_observation(self):
m = self.price_all[:,self.current_step : self.current_step + 4]
n=self.P_d_all[:,self.current_step : self.current_step + 4 ]
obs = np.append(y,self.SOC)
return obs
def step(self,action):
self.current_step += 1
obs = self._next_observation()
self.price = self.df.loc[self.current_step, 'Price']
self.P_b = action[1]*(MAX_P_B-MIN_P_B)+MIN_P_B
reward=self.price* self.P_b
cost=torch.as_tensor(random.random())
terminated= self.current_step == self.iterations
truncated=torch.as_tensor(self._count > 10)
info={}
return obs, reward, cost, terminated, truncated, info
def reset(self):
self.P_d =0
self.price = 0
self.P_b = 0
self.current_step = 0
I have tried your code again, and I used the following steps:
cust_env.py
like what you provide:
from __future__ import annotations
from typing import Any, ClassVar
import numpy as np import torch
from omnisafe.envs.core import CMDP, env_register from omnisafe.typing import DEVICE_CPU, Box from gymnasium.spaces import Box import random
@env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Cust0-v0', 'Cust1-v0', 'Cust2-v0', ] need_auto_reset_wrapper = True
need_time_limit_wrapper = False
need_action_repeat_wrapper = True
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: torch.device = DEVICE_CPU,
**kwargs: Any,
) -> None:
super().__init__(env_id)
self._num_envs = num_envs
self._device = torch.device(device)
self._observation_space = Box(low=0, high=1.0, shape=(13,))
self._action_space = Box(low=0, high=1.0, shape=(4,))
self._coordinate_observation_space = Box(low=0, high=1.0, shape=(13,))
# self.df = pd.read_csv('data.csv')
def normalization(self,data):
_range = np.max(data) - np.min(data)
return (data - np.min(data)) / _range
def get(self):
self.price_all= np.array([self.df.loc[: , 'Price'].values])
self.P_d_all = np.array([self.df.loc[: , 'Demand'].values])
self.price_all=self.normalization(self.price_all)
self.P_d_all=self.normalization(self.P_d_all)
def _next_observation(self):
m = self.price_all[:,self.current_step : self.current_step + 4]
n=self.P_d_all[:,self.current_step : self.current_step + 4 ]
obs = np.append(n,self.SOC)
return obs
def step(self,action):
self.current_step += 1
obs = self._next_observation()
self.price = self.df.loc[self.current_step, 'Price']
self.P_b = action[1]
reward=self.price* self.P_b
cost=torch.as_tensor(random.random())
terminated= self.current_step == self.iterations
truncated=torch.as_tensor(self._count > 10)
info={}
return obs, reward, cost, terminated, truncated, info
def reset(self):
self.P_d =0
self.price = 0
self.P_b = 0
self.current_step = 0
def set_seed(self, seed: int) -> None:
pass
def sample_action(self) -> torch.Tensor:
pass
def render(self) -> Any:
pass
def close(self) -> None:
pass
- add a breakout in ``omnisafe/envs/wrapper``
```python
class ObsNormalize(Wrapper):
"""Normalize the observation.
Examples:
>>> env = ObsNormalize(env)
>>> norm = Normalizer(env.observation_space.shape) # load saved normalizer
>>> env = ObsNormalize(env, norm)
Args:
env (CMDP): The environment to wrap.
device (torch.device): The torch device to use.
norm (Normalizer or None, optional): The normalizer to use. Defaults to None.
"""
def __init__(self, env: CMDP, device: torch.device, norm: Normalizer | None = None) -> None:
"""Initialize an instance of :class:`ObsNormalize`."""
super().__init__(env=env, device=device)
assert isinstance(self.observation_space, spaces.Box), 'Observation space must be Box'
print('successfully initiate environment!')
exit()
self._obs_normalizer: Normalizer
if norm is not None:
self._obs_normalizer = norm.to(self._device)
else:
self._obs_normalizer = Normalizer(self.observation_space.shape, clip=5).to(self._device)
Then, I obtained no bug result:
Unfortunately, I can not reproduce your issue, but I hope the above information can help you to some degree. Additionally, I noticed the indentation of the function reset
is wrong, and variable y
is not defined in def _next_observation(self):
. I don't know whether those problems are the key.
I tried writing code in a similar way to your screenshots, but unfortunately, I was unable to reproduce the same error. I will share my steps and source code below, and I hope it can be of help to you:
- Create a new file
omnisafe/envs/custenv.py
, with the inner code asfrom __future__ import annotations from typing import Any, ClassVar import torch from omnisafe.envs.core import CMDP, env_register from omnisafe.typing import DEVICE_CPU @env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Custom0-v0', 'Custom1-v0', 'Custom2-v0', ] def __init__( self, env_id: str, num_envs: int = 1, device: torch.device = DEVICE_CPU, **kwargs: Any, ) -> None: super().__init__(env_id) self._num_envs = num_envs self._device = torch.device(device) def reset( self, seed: int | None = None, options: dict[str, Any] | None = None, ) -> tuple[torch.Tensor, dict[str, Any]]: pass def step(self): pass def set_seed(self, seed: int) -> None: pass def sample_action(self) -> torch.Tensor: pass def render(self) -> Any: pass def close(self) -> None: pass
- Import CustomEnv in
omnisafe/envs/__init__.py
from omnisafe.envs.core import CMDP, env_register, make, support_envs from omnisafe.envs.mujoco_env import MujocoEnv from omnisafe.envs.safety_gymnasium_env import SafetyGymnasiumEnv from omnisafe.envs.safety_gymnasium_modelbased import SafetyGymnasiumModelBased from omnisafe.envs.custenv import CustomEnv
- Run python train_policy.py --algo PPO --env-id Custom0-v0 in the terminal
python train_policy.py --algo PPO --env-id Custom0-v0
As there is nothing in this empty environment, I exit the code in
omnisafe/adapter/online_adapter
as soon as it successfully make the environment:class OnlineAdapter: """Online Adapter for OmniSafe. OmniSafe is a framework for safe reinforcement learning. It is designed to be compatible with any existing RL algorithms. The online adapter is used to adapt the environment to the framework. Args: env_id (str): The environment id. num_envs (int): The number of parallel environments. seed (int): The random seed. cfgs (Config): The configuration. """ def __init__( # pylint: disable=too-many-arguments self, env_id: str, num_envs: int, seed: int, cfgs: Config, ) -> None: """Initialize an instance of :class:`OnlineAdapter`.""" assert env_id in support_envs(), f'Env {env_id} is not supported.' self._cfgs: Config = cfgs self._device: torch.device = get_device(cfgs.train_cfgs.device) self._env_id: str = env_id self._env: CMDP = make(env_id, num_envs=num_envs, device=self._device) exit()
The code served me well and there is no
TypeError
.
Good evening, I followed the guide you provided step by step. Having encountered the same errors as this problem, I'm sharing my code and the error I'm seeing. Thank you in advance for your help. from future import annotations
from typing import Any, ClassVar import torch
from omnisafe.envs.core import CMDP, env_register from omnisafe.typing import DEVICE_CPU
@env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Custom0-v0', 'Custom1-v0', 'Custom2-v0', ]
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: torch.device = DEVICE_CPU,
**kwargs: Any,
) -> None:
super().__init__(env_id)
self._num_envs = num_envs
self._device = torch.device(device)
self._env = custom_env.make(env_id=env_id, **kwargs)
def reset(
self,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[torch.Tensor, dict[str, Any]]:
pass
def step(self):
pass
def set_seed(self, seed: int) -> None:
pass
def sample_action(self) -> torch.Tensor:
pass
def render(self) -> Any:
pass
def close(self) -> None:
pass
The error I'm getting is as follows: AssertionError: Custom1-v0 doesn't exist. , even though I have added in init.py the line from omnisafe.envs.custom_env import CustomEnvcode:
@NoemiF99 Hello. Your issue seems a little bit disorganized. Could you give us more detailed error messages, file structures, etc., so we can help you better identify the cause?
Thank you for your kind reply, it helps me a lot that you can not imagine. I have a new problem and need your help.
self._observation_space = Box(low=0, high=1.0, shape=(8,), dtype=np.float32) self._action_space = Box(low=0, high=1.0, shape=(1,), dtype=np.float32)
Although we have set the aboving in our environment, Why the print action exceed the range 0-1? What else should be set to gurateen the range of observation and action in 0-1? The observation range is limites into 0-1 because we introduce the following normalization in our environment, If we delete the following normalization, the observation also exceeds the range 0-1, So What's the function of the omnisafe\common\normalizer.py? Thank you ~
def normalization(self,data): _range = np.max(data) - np.min(data) return (data - np.min(data)) / _range
@NoemiF99 From omnisafe.envs.文件名 import 类名 so it may be "from omnisafe.envs.custenv import CustomEnv" or you can try the terminal run because the spyder that I used in windows system is more error.
@charleswangyanan Thanks for your recognition and your helpful response. The design of OmniSafe's ObsNormalize
Wrapper references the sixth among the nine tricks related to continuous action space environments in The 37 Implementation Details of Proximal Policy Optimization, aiming to improve the performance of reinforcement learning algorithms. Regarding the action constraint issue you presented, you could try using the ActionScale
Wrapper in the omnisafe/envs/wrappers
file to map the actions. Additionally, the components provided by OmniSafe are designed to enhance the performance of reinforcement learning algorithms and are not tools for designing environments.
How to add it specifically? In my environment or in train_policy.py? In my environment, adding the following, how to define the env?
from omnisafe.envs.wrapper import ActionScale env = ActionScale(env, low=0, high=1)
You can refer to omnisafe/adapter/online_adapter
for more details. Our implementation is:
def _wrapper(
self,
obs_normalize: bool = True,
reward_normalize: bool = True,
cost_normalize: bool = True,
) -> None:
# Some code here.
self._env = ActionScale(self._env, low=-1.0, high=1.0, device=self._device)
self._eval_env = ActionScale(self._eval_env, low=-1.0, high=1.0, device=self._device)
if self._env.num_envs == 1:
self._env = Unsqueeze(self._env, device=self._device)
self._eval_env = Unsqueeze(self._eval_env, device=self._device)
It seems your implementation is basically the same, so I think it is ok.
In my environment, adding the following, how to define the self._env? self._env = ActionScale(self._env, low=-1.0, high=1.0, device=self._device)
or the action range is decided by the omnisafe/adapter/online_adapter? I modeifeied the -1-1 as 0-1 in def _wrapper() in omnisafe/adapter/online_adapter/ , there is no change.
The self._env
defined in omnisafe/adapter/online_adapter
is equal to an instantiated object of specific environment class.
For example, you can wrap your environment by:
env=CustomEnv(env_id='Custom1-v0', num_envs=1)
env=ActionScale(env, low=0, high=1.0)
@NoemiF99 From omnisafe.envs.文件名 import 类名 so it may be "from omnisafe.envs.custenv import CustomEnv" or you can try the terminal run because the spyder that I used in windows system is more error.
Yes, of course. I am using Docker to implement autonomous drone navigation through safe deep reinforcement learning algorithms. I have installed Omnisafe, and before using my environment, I am trying to create custom environments. I followed the guide and created a new Python file in the /omnisafe/envs/custom_env.py folder. The custom_env.py file is as follows:
from future import annotations
from typing import Any, ClassVar import torch
from omnisafe.envs.core import CMDP, env_register from omnisafe.typing import DEVICE_CPU
@env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Custom0-v0', 'Custom1-v0', 'Custom2-v0', ]
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: torch.device = DEVICE_CPU,
**kwargs: Any,
) -> None:
super().__init__(env_id)
self._num_envs = num_envs
self._device = torch.device(device)
self._env = custom_env.make(env_id=env_id, **kwargs)
def reset(
self,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[torch.Tensor, dict[str, Any]]:
pass
def step(self):
pass
def set_seed(self, seed: int) -> None:
pass
def sample_action(self) -> torch.Tensor:
pass
def render(self) -> Any:
pass
def close(self) -> None:
pass
After that, I modified the init.py file in the /omnisafe/envs/ folder as follows:
from omnisafe.envs.core import CMDP, env_register, make, support_envs from omnisafe.envs.mujoco_env import MujocoEnv from omnisafe.envs.safety_gymnasium_env import SafetyGymnasiumEnv from omnisafe.envs.safety_gymnasium_modelbased import SafetyGymnasiumModelBased from omnisafe.envs.custom_env import CustomEnv
After running the following command in my terminal: python3 train_policy.py --algo PPO --env-id Custom0-v0 I get the following error: I hope my issue is clearer now. Thank you very much in advance.
@NoemiF99 Thanks to the detailed terminal information you provided, I think I know how to solve your problem. Your omnisafe run path is catkin_ws/src/omnisafe
, and the error message indicates your code run path as usr/local/lib/python3.8/...
. This explains why you still get errors, even after following my steps. I suspect that the reason is that the file you modified is located at catkin_ws/src/omnisafe
, but the file being executed is at usr/local/lib/python3.8/...
, meaning your changes did not take effect on the running code. Since you need to customize omnisafe, I suggest you modify the omnisafe code directly in the usr/local/lib/python3.8/...
path.
@NoemiF99 Thanks to the detailed terminal information you provided, I think I know how to solve your problem. Your omnisafe run path is
catkin_ws/src/omnisafe
, and the error message indicates your code run path asusr/local/lib/python3.8/...
. This explains why you still get errors, even after following my steps. I suspect that the reason is that the file you modified is located atcatkin_ws/src/omnisafe
, but the file being executed is atusr/local/lib/python3.8/...
, meaning your changes did not take effect on the running code. Since you need to customize omnisafe, I suggest you modify the omnisafe code directly in theusr/local/lib/python3.8/...
path.
Okay, thank you. Honestly, I don't understand why I keep getting an error even though I use @env_register to register my custom environment. In the algo_wrapper.py code in the directory usr/local/lib/python3.8/dist-packages/omnisafe, at lines 72 and 140, it simply tells me that the environment has not been registered correctly. Is there any other step I should take to create and register my custom environment?
@NoemiF99 Thanks to the detailed terminal information you provided, I think I know how to solve your problem. Your omnisafe run path is
catkin_ws/src/omnisafe
, and the error message indicates your code run path asusr/local/lib/python3.8/...
. This explains why you still get errors, even after following my steps. I suspect that the reason is that the file you modified is located atcatkin_ws/src/omnisafe
, but the file being executed is atusr/local/lib/python3.8/...
, meaning your changes did not take effect on the running code. Since you need to customize omnisafe, I suggest you modify the omnisafe code directly in theusr/local/lib/python3.8/...
path.Okay, thank you. Honestly, I don't understand why I keep getting an error even though I use @env_register to register my custom environment. In the algo_wrapper.py code in the directory usr/local/lib/python3.8/dist-packages/omnisafe, at lines 72 and 140, it simply tells me that the environment has not been registered correctly. Is there any other step I should take to create and register my custom environment? @Gaiejj Taking a closer look at the problem reported by @charleswangyanan, I encountered the error in the same lines of code in the algo_wrapper.py file. I would like to understand how @charleswangyanan resolved it so that I can apply it to my code. Thank you very much.
@NoemiF99 Thanks to the detailed terminal information you provided, I think I know how to solve your problem. Your omnisafe run path is
catkin_ws/src/omnisafe
, and the error message indicates your code run path asusr/local/lib/python3.8/...
. This explains why you still get errors, even after following my steps. I suspect that the reason is that the file you modified is located atcatkin_ws/src/omnisafe
, but the file being executed is atusr/local/lib/python3.8/...
, meaning your changes did not take effect on the running code. Since you need to customize omnisafe, I suggest you modify the omnisafe code directly in theusr/local/lib/python3.8/...
path.Okay, thank you. Honestly, I don't understand why I keep getting an error even though I use @env_register to register my custom environment. In the algo_wrapper.py code in the directory usr/local/lib/python3.8/dist-packages/omnisafe, at lines 72 and 140, it simply tells me that the environment has not been registered correctly. Is there any other step I should take to create and register my custom environment?
I mean, you can place the custom_env.py
file in usr/local/lib/python3.8/dist-packages/omnisafe/omnisafe/envs
, then import CustomEnv
in usr/local/lib/python3.8/dist-packages/omnisafe/omnisafe/envs/__init__.py
. I sincerely hope this can help you register your environment.
@NoemiF99 Thanks to the detailed terminal information you provided, I think I know how to solve your problem. Your omnisafe run path is
catkin_ws/src/omnisafe
, and the error message indicates your code run path asusr/local/lib/python3.8/...
. This explains why you still get errors, even after following my steps. I suspect that the reason is that the file you modified is located atcatkin_ws/src/omnisafe
, but the file being executed is atusr/local/lib/python3.8/...
, meaning your changes did not take effect on the running code. Since you need to customize omnisafe, I suggest you modify the omnisafe code directly in theusr/local/lib/python3.8/...
path.Okay, thank you. Honestly, I don't understand why I keep getting an error even though I use @env_register to register my custom environment. In the algo_wrapper.py code in the directory usr/local/lib/python3.8/dist-packages/omnisafe, at lines 72 and 140, it simply tells me that the environment has not been registered correctly. Is there any other step I should take to create and register my custom environment?
I mean, you can place the
custom_env.py
file inusr/local/lib/python3.8/dist-packages/omnisafe/omnisafe/envs
, then importCustomEnv
inusr/local/lib/python3.8/dist-packages/omnisafe/omnisafe/envs/__init__.py
. I sincerely hope this can help you register your environment.
@Gaiejj It seems that by doing it this way, the error related to environment registration is not there. Now I will try to implement my environment for autonomous drone guidance. Thank you very much for the help you have provided, you have been very kind and helpful. If there are any further issues regarding the implementation of my environment, I will write here.
wrap Thanks a lot , I know the env should be instantiated, but I dong't know that should be placed which py document. For example, I placed the following in the omnisafe/adapter/online_adapter, but the result also exceed 0-1.
from omnisafe.envs.custom_env import CustomEnv
self._env=CustomEnv(env_id='Custom0-v0', num_envs=1)
self._eval_env=CustomEnv(env_id='Custom0-v0', num_envs=1)
self._env=ActionScale(self._env, low=0, high=1.0, device=self._device)
self._eval_env = ActionScale(self._eval_env, low=-1.0, high=1.0, device=self._device)
My bad. I double-checked the code, and I think this step helped solve the action scale issue:
self._env=ActionScale(self._env, low=-1, high=1)
(instead of self._env=ActionScale(self._env, low=0, high=1)
)
The ActionScale
wrapper will map the action from the scale of the actor ($[L{actor}, H{actor}]$) to the scale of the environment ($[L{env}, H{env}]$). The $[L{env}, H{env}]$ action scale [0, 1] has already been defined in your omnisafe/envs/custom_env.py
. Now the args you need to pass into the ActionScale
wrapper (the low
and high
) should be $[L{actor}, H{actor}]$. If you use the actor of OmniSafe, just set it as [-1, 1], as OmniSafe actors always output [-1, 1]. Or if you use your own actor, for example, its output range is [-2, 2], you can set it as [-2, 2].Sorry, I don'y understand your meaning, whatever the scale of the environment, finally, after the actor of OmniSafe, it will be changed as [-1, 1]? it can't be modified?
ActonScale
wrapper will map the action from the actor scale (defined by the args passed into the ActonScale
wrapper) to the env scale (defined by the environment, self._action_space= ...
)ActionScale
wrapper --> environment.So, without the ActionScale
wrapper, the action will always be in the actor action range. With it, it will be in the environment action_space
Good evening. I'm encountering some issues while running my custom environment. First, I've defined my custom environment CustomEnv, which is used for implementing autonomous drone guidance through safe deep reinforcement learning algorithms. The code is as follows: #! /usr/bin/python3 from future import annotations
from typing import Any, ClassVar import torch import numpy as np
from omnisafe.envs.core import CMDP, env_register from omnisafe.typing import DEVICE_CPU,Box from omnisafe.envs import custom_env
import rospy from tf.transformations import euler_from_quaternion, quaternion_from_euler from mav_msgs.msg import Actuators from omnisafe.Quad_RL.rotors_gazebo.scripts import ros_node from omnisafe.Quad_RL.rotors_gazebo.scripts import ros_gazebo from std_msgs.msg import Float32 from mav_msgs.msg import Actuators from gazebo_msgs.msg import ModelStates import csv import rospkg from gazebo_msgs.srv import SetModelState from gazebo_msgs.msg import ModelState from omnisafe.Quad_RL.rotors_gazebo.scripts.Position_controller import PositionController
N_ACTIONS = 3 SAMPLING_TIME = 0.01
LENGTH = 200 # 10 secondi
MIN_X = -400 MAX_X = 400 MIN_Y = -400 MAX_Y = 400 MIN_Z = 0 MAX_Z = 600
GOAL_MIN_X = -200
GOAL_MAX_X = 200
GOAL_MIN_Y = -200
GOAL_MAX_Y = 200
GOAL_MIN_Z = 200
GOAL_MAX_Z = 400
msg_mot = Actuators() msg_motor = Float32() msg_act = Actuators()
rospy.init_node('quadcopter_training', anonymous=True)
@env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Custom0-v0', 'Custom1-v0', 'Custom2-v0', ]
need_auto_reset_wrapper = True
need_time_limit_wrapper = False
need_action_report_wrapper = True
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: torch.device = DEVICE_CPU,
**kwargs: Any,
) -> None:
super().__init__(env_id)
self._num_envs = num_envs
self._device = torch.device(device)
#NO TILT ENV OBSERVATION
self._action_space=Box(low = np.array([-0.5, -0.5, -1.0]), high =np.array([0.5, 0.5, 1.0]), shape=(N_ACTIONS,), dtype=np.float64)
low = np.array([-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -np.radians(180), -np.radians(180), -np.radians(180), 0, 0])
high = np.array([ np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.radians(180), np.radians(180), np.radians(180), 1, 1])
self._observation_space = Box(low=low, high=high, shape=(11,), dtype=np.float64)
rospy.Subscriber('/gazebo/model_states', ModelStates, self.Odometry_callback)
rospy.Subscriber('/ardrone/motor_speed/0', Float32, self.Motor0_callback)
rospy.Subscriber('/ardrone/motor_speed/1', Float32, self.Motor1_callback)
rospy.Subscriber('/ardrone/motor_speed/2', Float32, self.Motor2_callback)
rospy.Subscriber('/ardrone/motor_speed/3', Float32, self.Motor3_callback)
self.commands_pub = rospy.Publisher('/ardrone/command/motor_speed', Actuators, queue_size=1)
self.start_time = None
self.filecsv = 'RANDOMYAW_TEST.csv'
self.dati = []
#self.make_csv()
self.PositionController = PositionController()
self.PositionController.init_PIDs()
def Motor0_callback(self,data):
self.mot0 = data
def Motor1_callback(self,data):
self.mot1 = data
def Motor2_callback(self,data):
self.mot2 = data
def Motor3_callback(self,data):
self.mot3 = data
def Odometry_callback(self, data):
self.odometry = data
def get_RPY(self):
Quaternion = np.array([self.odometry.pose[1].orientation.x, self.odometry.pose[1].orientation.y, self.odometry.pose[1].orientation.z, self.odometry.pose[1].orientation.w])
self.roll, self.pitch, self.yaw = euler_from_quaternion(Quaternion)
self.roll *= 100
self.pitch *= 100
self.yaw *= 100
def get_Pose(self):
self.x = self.odometry.pose[1].position.x*100
self.y = self.odometry.pose[1].position.y*100
self.z = self.odometry.pose[1].position.z*100
def get_Velocity(self):
self.vx = self.odometry.twist[1].linear.x
self.vy = self.odometry.twist[1].linear.y
self.vz = self.odometry.twist[1].linear.z
self.p = self.odometry.twist[1].angular.x
self.q = self.odometry.twist[1].angular.y
self.r = self.odometry.twist[1].angular.z
def get_MotorsVel(self):
return self.mot0.data, self.mot1.data, self.mot2.data, self.mot3.data
def check_get_pose_ready(self):
self.odometry = None
rospy.logdebug("Waiting for /get_pose to be READY...")
while self.odometry is None and not rospy.is_shutdown():
try:
self.odometry = rospy.wait_for_message("/gazebo/model_states", ModelStates, timeout=5.0)
rospy.logdebug("Current /get_pose READY=>")
except:
rospy.logerr("Current /get_pose not ready yet, retrying for getting get_pose")
return self.odometry
def check_get_motors_ready(self):
self.mot0 = None
self.mot1 = None
self.mot2 = None
self.mot3 = None
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot0 is None and not rospy.is_shutdown():
try:
self.mot0 = rospy.wait_for_message('/ardrone/motor_speed/0', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot1 is None and not rospy.is_shutdown():
try:
self.mot1 = rospy.wait_for_message('/ardrone/motor_speed/1', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot2 is None and not rospy.is_shutdown():
try:
self.mot2 = rospy.wait_for_message('/ardrone/motor_speed/2', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot3 is None and not rospy.is_shutdown():
try:
self.mot4 = rospy.wait_for_message('/ardrone/motor_speed/3', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
return self.mot0, self.mot1, self.mot2, self.mot3
def step(self, action):
self.info = {}
start_time = rospy.Time.now().to_sec()
for k in range(5):
if self.start_time is not None:
delay = rospy.Time.now().to_sec() - self.start_time
if delay < SAMPLING_TIME:
rospy.sleep(SAMPLING_TIME - delay)
#rospy.logwarn('delay:{}'.format(SAMPLING_TIME - delay))
self.get_Velocity()
#self.get_Pose() # if PID
#thrust_cmd = self.PositionController.ZVelocityController(z_ref = 200, z = self.z, vz = self.vz) # if PID
self.random_yaw = np.random.uniform(-np.radians(180), np.radians(180))
thrust_cmd = self.PositionController.ZVelocityController(action, self.vz) #if RL
#self.get_Pose() # if PID
self.get_Velocity()
#roll_ref, pitch_ref, yaw_ref = self.PositionController.VelocityController(x = self.x, y = self.y, x_ref = 200, y_ref = 100, vx = self.vx, vy = self.vy, yaw = self.yaw) # if PID
roll_ref, pitch_ref, yaw_ref = self.PositionController.VelocityController(action,self.vx, self.vy, self.yaw, self.random_yaw) #if RL
self.get_RPY()
self.get_Velocity()
roll_cmd, pitch_cmd, yaw_cmd = self.PositionController.AttitudeController(roll_ref, pitch_ref, yaw_ref, self.roll, self.pitch, self.r,action,self.vx, self.vy, self.yaw,self.random_yaw)
W_FR, W_BL, W_FL, W_BR = self.PositionController.ControlMixer(roll_cmd, pitch_cmd, thrust_cmd, yaw_cmd,action,self.vx, self.vy,self.roll,self.pitch,self.yaw, self.r,roll_ref, pitch_ref, yaw_ref,self.random_yaw,self.vz)
self.send_commands(W_FR, W_BL, W_FL, W_BR)
self.start_time = rospy.Time.now().to_sec()
#dtRL = rospy.Time.now().to_sec() - start_time
#rospy.logwarn('dtRL:{}'.format(dtRL))
ros_gazebo.gazebo_pause_physics()
observation = self.get_observation()
done = self.is_done()
reward, info, done = self.get_reward(done,action)
ros_gazebo.gazebo_unpause_physics()
return observation, reward, done, info
def reset(self,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[torch.Tensor, dict[str, Any]]:
ros_gazebo.gazebo_reset_world()
ros_gazebo.gazebo_unpause_physics()
self.check_get_motors_ready()
self.check_get_pose_ready()
self.reset_variables()
self.set_goal() # random goals
#self.set_new_waypoint() # waypoints goals
self.reset_motors()
#self.set_quad_pose() # to start with random yaw -> set Position controller too
self.PositionController.reset_PIDs()
ros_gazebo.gazebo_pause_physics()
observation = self.get_observation()
ros_gazebo.gazebo_unpause_physics()
return observation,{}
def close (self):
rospy.loginfo("Closing Env")
rospy.signal_shutdown("Closing RobotGazeboEnvironment")
ros_gazebo.close_Gazebo()
ros_node.ros_kill_all_nodes()
ros_node.ros_kill_master()
ros_node.ros_kill_all_processes()
print("Closed ROS and Env")
def set_seed(self, seed: int) -> None:
pass
def sample_action(self) -> torch.Tensor:
pass
def render(self) -> Any:
pass
def send_commands(self,w1, w2, w3, w4):
global msg_mot
self.get_Pose()
msg_mot.angular_velocities.clear()
msg_mot.header.stamp = rospy.Time.now()
msg_mot.angular_velocities = [w1, w2, w3, w4]
self.commands_pub.publish(msg_mot)
#rospy.logwarn('Commands_Published:{}'.format(msg_mot.angular_velocities))
def get_reward(self,done,action):
c = 8
GOAL_BONUS = 0
CRASH_PENALTY = 0
DISTANCE_REWARD = -self.distance_to_goal/100
self.tilt = -np.linalg.norm(self.drone_angular)/c
TILT_PENALTY = self.tilt
#TILT_PENALTY = 0
self.terminated = False
if done == True:
CRASH_PENALTY = -100
self.reward = CRASH_PENALTY
self.info['is_success'] = False
self.terminated = True
elif done == False:
self.reward = DISTANCE_REWARD + TILT_PENALTY # TILT ENV
#self.reward = DISTANCE_REWARD # NO TILT ENV
if self.distance_to_goal < 0.15:
GOAL_BONUS = 100
self.reward = GOAL_BONUS
rospy.logerr('Goal Reached')
rospy.loginfo('Error:{}'.format(self.distance_to_goal))
self.info['is_success'] = True
self.set_goal()
#self.waypoint_index = (self.waypoint_index + 1) % len(self.waypoints)
#self.set_new_waypoint()
if self.counter_steps > LENGTH:
self.terminated = True
rospy.loginfo('Episode Terminated')
self.cumulated_reward += self.reward
'''R1 = DISTANCE_REWARD
self.crash = CRASH_PENALTY
self.bonus = GOAL_BONUS
R4 = TILT_PENALTY
TOTAL_REW = self.cumulated_reward
N_STEPS = self.counter_steps
DONE = self.terminated
ERRORE = self.distance_to_goal
X = self.x
Y = self.y
Z = self.z
VX = self.vx
VY = self.vy
VZ = self.vz
X_GOAL = self.random_goal[0]
Y_GOAL = self.random_goal[1]
Z_GOAL = self.random_goal[2]
ROLL = self.roll/100
PITCH = self.pitch/100
YAW = self.yaw/100
WX = self.p
WY = self.q
WZ = self.r
self.dati = [self.crash, self.bonus, N_STEPS, DONE, ERRORE, X, Y, Z, VX, VY, VZ, X_GOAL, Y_GOAL, Z_GOAL, ROLL, PITCH, YAW, WX, WY, WZ, action[0],action[1],action[2], R1, R4, TOTAL_REW]
self.add_row(self.dati)'''
return self.reward, self.info, self.terminated
def get_observation(self):
self.get_Pose()
self.get_Velocity()
self.get_RPY()
self.drone_position = np.array([self.x, self.y, self.z])
self.drone_velocity = np.array([self.vx, self.vy, self.vz])
self.drone_angular = np.array([self.p, self.q, self.r])
error_x = (self.random_goal[0] - self.x)/100
error_y = (self.random_goal[1] - self.y)/100
error_z = (self.random_goal[2] - self.z)/100
self.distance_to_goal = self.compute_distance_to_target(self.random_goal, self.drone_position)/100
goal_reached = self.is_goal_reached()
has_flipped = self.has_flipped()
if has_flipped == True:
crash = 1
else:
crash = 0
# TILT ENV OBS
obs = np.array([error_x, error_y, error_z,
self.vx, self.vy, self.vz,
self.p, self.q, self.r,
self.roll/100, self.pitch/100, self.yaw/100,
goal_reached, crash])
# NO TILT ENV OBS
'''obs = np.array([error_x, error_y, error_z,
self.vx, self.vy, self.vz,
self.roll/100, self.pitch/100, self.yaw/100,
goal_reached, crash])'''
# RANDOM YAW OBS
'''obs = np.array([error_x, error_y, error_z,
self.vx, self.vy, self.vz,
self.p, self.q, self.r,
self.roll/100, self.pitch/100, self.random_yaw - self.yaw/100,
goal_reached, crash])'''
#rospy.logerr("Observations:{}".format(np.array([obs])))
return obs
def set_quad_pose(self):
model_state_msg = ModelState()
roll = 0
pitch = 0
self.random_yaw = np.random.uniform(-np.radians(180), np.radians(180))
x,y,z,w = quaternion_from_euler(roll,pitch,self.random_yaw)
model_state_msg.model_name = 'ardrone'
model_state_msg.pose.position.x = 0.0
model_state_msg.pose.position.y = 0.0
model_state_msg.pose.position.z = 0.1
model_state_msg.pose.orientation.x = x
model_state_msg.pose.orientation.y = y
model_state_msg.pose.orientation.z = z
model_state_msg.pose.orientation.w = w
set_model_state = rospy.ServiceProxy('/gazebo/set_model_state', SetModelState)
response = set_model_state(model_state_msg)
def is_done(self):
done = False
self.counter_steps +=1
has_flipped = self.has_flipped()
if has_flipped == True:
done = True
#if self.counter_steps == LENGTH:
#self.plot()
return done
def is_goal_reached(self):
if self.distance_to_goal < 0.15:
goal_reached = 1
else:
goal_reached = 0
return goal_reached
def set_goal(self):
x_goal = np.random.uniform(GOAL_MIN_X, GOAL_MAX_X)
y_goal = np.random.uniform(GOAL_MIN_Y, GOAL_MAX_Y)
z_goal = np.random.uniform(GOAL_MIN_Z, GOAL_MAX_Z)
self.random_goal = np.array([x_goal, y_goal, z_goal])
rospy.loginfo('New Goal:{}'.format(self.random_goal))
def set_new_waypoint(self):
# SQUARE TRAJECTORY WAYPOINTS
self.waypoints = [[0 , 0, 200], [100, -100, 200], [100, 100, 200], [-100, 100, 200], [-100, -100, 200], [0, 0, 200]]
x_goal = self.waypoints[self.waypoint_index][0]
y_goal = self.waypoints[self.waypoint_index][1]
z_goal = self.waypoints[self.waypoint_index][2]
self.random_goal = np.array([x_goal, y_goal, z_goal])
rospy.loginfo('New Goal:{}'.format(self.random_goal))
def compute_distance_to_target(self,a,b):
distance = np.linalg.norm(a-b)
return distance
def is_inside_workspace(self):
is_inside = False
self.get_Pose()
if self.x >= MIN_X and self.x <= MAX_X:
if self.y >= MIN_Y and self.y <= MAX_Y:
if self.z >= MIN_Z and self.z<= MAX_Z:
is_inside = True
return is_inside
def has_flipped(self):
self.get_RPY()
has_flipped = False
if not(-np.radians(90)*100 <= self.roll <= np.radians(90)*100) or not(-np.radians(90)*100 <= self.pitch <= np.radians(90)*100) :
has_flipped = True
return has_flipped
def reset_variables(self):
self.counter_steps = 0
self.cumulated_reward = 0.0
self.drone_position = None
self.drone_velocity = None
self.drone_angular = None
self.distance_to_goal = None
self.reward = 0.0
self.waypoint_index = 0
def reset_motors(self):
global msg_mot, msg_motor
w1,w2,w3,w4 = self.get_MotorsVel()
while not(round(w1,2) == 0.0 and round(w2,2) == 0.0 and round(w3,2) == 0.0 and round(w4,2) == 0.0):
msg_mot.angular_velocities.clear()
msg_mot.header.stamp = rospy.Time.now()
msg_mot.angular_velocities = [0,0,0,0]
self.commands_pub.publish(msg_mot)
w1,w2,w3,w4 = self.get_MotorsVel()
velocities = np.array([w1,w2,w3,w4])
#rospy.logwarn('Motors:{}'.format(np.array([velocities])))
def make_csv(self):
rospack = rospkg.RosPack()
pkg_path = rospack.get_path('rotors_gazebo')
intestazioni = ['CRASH', 'GOAL', 'N_STEPS', 'DONE', 'ERRORE', 'X', 'Y', 'Z', 'VX', 'VY', 'VZ', 'X_GOAL', 'Y_GOAL', 'Z_GOAL', 'ROLL', 'PITCH', 'YAW', 'WX', 'WY', 'WZ','VX_ACTION', 'VY_ACTION', 'VZ_ACTION', 'DISTANCE_REW', 'TILT_REW', 'TOTAL_REW', 'X_REF', 'Y_REF', 'Z_REF']
self.file_csv = pkg_path + '/Training/NewTests/' + self.filecsv
with open(self.file_csv, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(intestazioni)
def add_row(self, dati):
with open(self.file_csv, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow(dati)
When I run the command python3 train_policy.py --algo PPOLag --env-id Custom1-v0 I get the following error in the normalizer.py file:
I tried to resolve the error by checking the type of raw_data in normalizer.py, and it appears to be of type torch.Size([14]), but for normalization, it needs to be of type <class 'tuple'> like self._shape. Could you please help me resolve this error? Thank you in advance.
It's hard to provide specific tips. If your customized environment does not support the OmniSafe's wrapper, or you prefer not to use it, you can set the obs_normalize
parameter to false in the corresponding algorithm's yaml file. Specifically, omnisafe/configs/on-policy/PPOLag.yaml
.
It's hard to provide specific tips. If your customized environment does not support the OmniSafe's wrapper, or you prefer not to use it, you can set the
obs_normalize
parameter to false in the corresponding algorithm's yaml file. Specifically,omnisafe/configs/on-policy/PPOLag.yaml
.
Thank you very much, by changing the 'obs_normalize' parameter to False, I no longer see the previous error. Now, however, I get this error:
I would be grateful if you could help me resolve this issue. Thank you very much for your assistance.
You can refer to omnisafe/envs/safety_gymnasium_env.py
, the step
function transfers all np.ndarray
to torch.Tensor
.
Good morning, I have implemented my custom environment with the aim of autonomous drone navigation using SDRL algorithms. After implementing my environment, I set the calculate_cost function to compute the cost. The implementation of my code, which contains the definition of the function, is as follows:
from future import annotations
from typing import Any, ClassVar import torch import numpy as np
from omnisafe.envs.core import CMDP, env_register from omnisafe.typing import DEVICE_CPU,Box from omnisafe.envs import custom_env
import rospy from tf.transformations import euler_from_quaternion, quaternion_from_euler from mav_msgs.msg import Actuators from omnisafe.Quad_RL.rotors_gazebo.scripts import ros_node from omnisafe.Quad_RL.rotors_gazebo.scripts import ros_gazebo from std_msgs.msg import Float32 from mav_msgs.msg import Actuators from gazebo_msgs.msg import ModelStates import csv import rospkg from gazebo_msgs.srv import SetModelState from gazebo_msgs.msg import ModelState from omnisafe.Quad_RL.rotors_gazebo.scripts.Position_controller import PositionController
N_ACTIONS = 3 SAMPLING_TIME = 0.01
LENGTH = 200 # 10 secondi
MIN_X = -400 MAX_X = 400 MIN_Y = -400 MAX_Y = 400 MIN_Z = 0 MAX_Z = 600
GOAL_MIN_X = -200
GOAL_MAX_X = 200
GOAL_MIN_Y = -200
GOAL_MAX_Y = 200
GOAL_MIN_Z = 200
GOAL_MAX_Z = 400
msg_mot = Actuators() msg_motor = Float32() msg_act = Actuators()
rospy.init_node('quadcopter_training', anonymous=True)
@env_register class CustomEnv(CMDP): _support_envs: ClassVar[list[str]] = [ 'Custom0-v0', 'Custom1-v0', 'Custom2-v0', ]
need_auto_reset_wrapper = True
need_time_limit_wrapper = False
need_action_report_wrapper = True
def __init__(
self,
env_id: str,
num_envs: int = 1,
device: torch.device = DEVICE_CPU,
**kwargs: Any,
) -> None:
super().__init__(env_id)
self._num_envs = num_envs
self._device = torch.device(device)
self._action_space=Box(low = np.array([-0.5, -0.5, -1.0]), high =np.array([0.5, 0.5, 1.0]), shape=(N_ACTIONS,), dtype=np.float64)
# NO-TILT ENV OBSERVATION
#low = np.array([-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -np.radians(180), -np.radians(180), -np.radians(180), 0, 0])
#high = np.array([ np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.radians(180), np.radians(180), np.radians(180), 1, 1])
# TITL ENV OBSERVATION
low = np.array([-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -np.inf,-np.inf, -np.inf, -np.inf, -np.radians(180), -np.radians(180), -np.radians(180), 0, 0])
high = np.array([ np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.radians(180), np.radians(180), np.radians(180), 1, 1])
self._observation_space = Box(low=low, high=high, shape=(14,), dtype=np.float64) # change shape to 11 if NOTILT ENV
# #TILT ENV OBSERVATION 2
# low = np.array([-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -np.radians(180), -np.radians(180), -np.radians(180), 0, 0, 0, 0, 0])
# high = np.array([ np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.radians(180), np.radians(180), np.radians(180), 1, 1, 1, 1, 1])
# self._observation_space = Box(low=low, high=high, shape=(14,), dtype=np.float64)
rospy.Subscriber('/gazebo/model_states', ModelStates, self.Odometry_callback)
rospy.Subscriber('/ardrone/motor_speed/0', Float32, self.Motor0_callback)
rospy.Subscriber('/ardrone/motor_speed/1', Float32, self.Motor1_callback)
rospy.Subscriber('/ardrone/motor_speed/2', Float32, self.Motor2_callback)
rospy.Subscriber('/ardrone/motor_speed/3', Float32, self.Motor3_callback)
self.commands_pub = rospy.Publisher('/ardrone/command/motor_speed', Actuators, queue_size=1)
self.start_time = None
self.filecsv = 'CPO_test.csv'
self.dati = []
#self.make_csv()
self.PositionController = PositionController()
self.PositionController.init_PIDs()
def Motor0_callback(self,data):
self.mot0 = data
def Motor1_callback(self,data):
self.mot1 = data
def Motor2_callback(self,data):
self.mot2 = data
def Motor3_callback(self,data):
self.mot3 = data
def Odometry_callback(self, data):
self.odometry = data
def get_RPY(self):
Quaternion = np.array([self.odometry.pose[1].orientation.x, self.odometry.pose[1].orientation.y, self.odometry.pose[1].orientation.z, self.odometry.pose[1].orientation.w])
self.roll, self.pitch, self.yaw = euler_from_quaternion(Quaternion)
self.roll *= 100
self.pitch *= 100
self.yaw *= 100
def get_Pose(self):
self.x = self.odometry.pose[1].position.x*100
self.y = self.odometry.pose[1].position.y*100
self.z = self.odometry.pose[1].position.z*100
def get_Velocity(self):
self.vx = self.odometry.twist[1].linear.x
self.vy = self.odometry.twist[1].linear.y
self.vz = self.odometry.twist[1].linear.z
self.p = self.odometry.twist[1].angular.x
self.q = self.odometry.twist[1].angular.y
self.r = self.odometry.twist[1].angular.z
def get_MotorsVel(self):
return self.mot0.data, self.mot1.data, self.mot2.data, self.mot3.data
def check_get_pose_ready(self):
self.odometry = None
rospy.logdebug("Waiting for /get_pose to be READY...")
while self.odometry is None and not rospy.is_shutdown():
try:
self.odometry = rospy.wait_for_message("/gazebo/model_states", ModelStates, timeout=5.0)
rospy.logdebug("Current /get_pose READY=>")
except:
rospy.logerr("Current /get_pose not ready yet, retrying for getting get_pose")
return self.odometry
def check_get_motors_ready(self):
self.mot0 = None
self.mot1 = None
self.mot2 = None
self.mot3 = None
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot0 is None and not rospy.is_shutdown():
try:
self.mot0 = rospy.wait_for_message('/ardrone/motor_speed/0', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot1 is None and not rospy.is_shutdown():
try:
self.mot1 = rospy.wait_for_message('/ardrone/motor_speed/1', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot2 is None and not rospy.is_shutdown():
try:
self.mot2 = rospy.wait_for_message('/ardrone/motor_speed/2', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
rospy.logdebug("Waiting for /get_motors to be READY...")
while self.mot3 is None and not rospy.is_shutdown():
try:
self.mot4 = rospy.wait_for_message('/ardrone/motor_speed/3', Float32, timeout=5.0)
rospy.logdebug("Current /get_motors READY=>")
except:
rospy.logerr("Current /get_motors not ready yet, retrying for getting get_motors")
return self.mot0, self.mot1, self.mot2, self.mot3
def step(self, action):
self.info = {}
start_time = rospy.Time.now().to_sec() #--> commentato in quadv1.py
for k in range(5):
if self.start_time is not None:
delay = rospy.Time.now().to_sec() - self.start_time
if delay < SAMPLING_TIME:
rospy.sleep(SAMPLING_TIME - delay)
self.get_Velocity()
self.random_yaw = torch.FloatTensor(1).uniform_(-np.radians(180), np.radians(180))
thrust_cmd = self.PositionController.ZVelocityController(action, self.vz)
roll_ref, pitch_ref, yaw_ref = self.PositionController.VelocityController(action, self.vx, self.vy, self.yaw, self.random_yaw)
self.get_RPY()
self.get_Velocity()
roll_cmd, pitch_cmd, yaw_cmd = self.PositionController.AttitudeController(roll_ref, pitch_ref, yaw_ref, self.roll, self.pitch, self.r, action, self.vx, self.vy, self.yaw, self.random_yaw)
W_FR, W_BL, W_FL, W_BR = self.PositionController.ControlMixer(roll_cmd, pitch_cmd, thrust_cmd, yaw_cmd, action, self.vx, self.vy, self.roll, self.pitch, self.yaw, self.r, roll_ref, pitch_ref, yaw_ref, self.random_yaw, self.vz)
self.send_commands(W_FR, W_BL, W_FL, W_BR)
self.start_time = rospy.Time.now().to_sec()
ros_gazebo.gazebo_pause_physics()
observation = self.get_observation()
done = self.is_done()
reward, info, done = self.get_reward(done)
truncated=False
cost=self.calculate_cost()
# Convert NumPy arrays to PyTorch tensors
observation, reward, cost, done, truncated = (
torch.as_tensor(x, dtype=torch.float32, device=self._device)
for x in (observation, reward, cost, done, truncated)
)
# # Get PyTorch tensor for final_observation in info
# if 'final_observation' in info:
# info['final_observation'] = np.array(
# [array if array is not None else np.zeros(observation.shape[-1]) for array in info['final_observation']],
# )
# info['final_observation'] = torch.as_tensor(
# info['final_observation'],
# dtype=torch.float32,
# device=self._device,
# )
ros_gazebo.gazebo_unpause_physics()
return observation, reward, cost, done, truncated, info
# def step(
# self,
# action):
# self.info = {}
# start_time = rospy.Time.now().to_sec()
# for k in range(5):
# if self.start_time is not None:
# delay = rospy.Time.now().to_sec() - self.start_time
# if delay < SAMPLING_TIME:
# rospy.sleep(SAMPLING_TIME - delay)
# #rospy.logwarn('delay:{}'.format(SAMPLING_TIME - delay))
# self.get_Velocity()
# #self.get_Pose() # if PID
# #thrust_cmd = self.PositionController.ZVelocityController(z_ref = 200, z = self.z, vz = self.vz) # if PID
# self.random_yaw = np.random.uniform(-np.radians(180), np.radians(180))
# thrust_cmd = self.PositionController.ZVelocityController(action, self.vz) #if RL
# #self.get_Pose() # if PID
# self.get_Velocity()
# #roll_ref, pitch_ref, yaw_ref = self.PositionController.VelocityController(x = self.x, y = self.y, x_ref = 200, y_ref = 100, vx = self.vx, vy = self.vy, yaw = self.yaw) # if PID
# roll_ref, pitch_ref, yaw_ref = self.PositionController.VelocityController(action,self.vx, self.vy, self.yaw, self.random_yaw) #if RL
# self.get_RPY()
# self.get_Velocity()
# roll_cmd, pitch_cmd, yaw_cmd = self.PositionController.AttitudeController(roll_ref, pitch_ref, yaw_ref, self.roll, self.pitch, self.r,action,self.vx, self.vy, self.yaw,self.random_yaw)
# W_FR, W_BL, W_FL, W_BR = self.PositionController.ControlMixer(roll_cmd, pitch_cmd, thrust_cmd, yaw_cmd,action,self.vx, self.vy,self.roll,self.pitch,self.yaw, self.r,roll_ref, pitch_ref, yaw_ref,self.random_yaw,self.vz)
# self.send_commands(W_FR, W_BL, W_FL, W_BR)
# self.start_time = rospy.Time.now().to_sec()
# #dtRL = rospy.Time.now().to_sec() - start_time
# #rospy.logwarn('dtRL:{}'.format(dtRL))
# ros_gazebo.gazebo_pause_physics()
# observation = self.get_observation()
# done = self.is_done()
# reward, info, done = self.get_reward(done,action)
# ros_gazebo.gazebo_unpause_physics()
# return observation, reward, done, info
def reset(self,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[torch.Tensor, dict[str, Any]]:
ros_gazebo.gazebo_reset_world()
ros_gazebo.gazebo_unpause_physics()
self.check_get_motors_ready()
self.check_get_pose_ready()
self.reset_variables()
self.set_goal() # random goals
#self.set_new_waypoint() # waypoints goals
self.reset_motors()
#self.set_quad_pose() # to start with random yaw -> set Position controller too
self.PositionController.reset_PIDs()
ros_gazebo.gazebo_pause_physics()
observation = self.get_observation()
ros_gazebo.gazebo_unpause_physics()
return observation,{}
def close (self):
rospy.loginfo("Closing Env")
rospy.signal_shutdown("Closing RobotGazeboEnvironment")
ros_gazebo.close_Gazebo()
ros_node.ros_kill_all_nodes()
ros_node.ros_kill_master()
ros_node.ros_kill_all_processes()
print("Closed ROS and Env")
def set_seed(self, seed: int) -> None:
pass
def sample_action(self) -> torch.Tensor:
pass
def render(self) -> Any:
pass
def send_commands(self,w1, w2, w3, w4):
global msg_mot
self.get_Pose()
msg_mot.angular_velocities.clear()
msg_mot.header.stamp = rospy.Time.now()
msg_mot.angular_velocities = [w1, w2, w3, w4]
self.commands_pub.publish(msg_mot)
#rospy.logwarn('Commands_Published:{}'.format(msg_mot.angular_velocities))
def get_reward(self,done):
c = 8 # 10, 12
GOAL_BONUS = 0
CRASH_PENALTY = 0
DISTANCE_REWARD = -self.distance_to_goal/100
TILT_PENALTY = -np.linalg.norm(self.drone_angular)/c
self.info = {}
terminated = False
if done == True:
CRASH_PENALTY = -100
self.reward = CRASH_PENALTY
self.info['is_success'] = False
terminated = True
elif done == False:
self.reward = DISTANCE_REWARD + TILT_PENALTY
if self.distance_to_goal < 0.15:
GOAL_BONUS = 100
self.reward = GOAL_BONUS
rospy.loginfo('Goal Reached')
rospy.loginfo('Distance:{}'.format(self.distance_to_goal))
self.info['is_success'] = True
terminated = True
self.cumulated_reward += self.reward
R1 = TILT_PENALTY
R2 = CRASH_PENALTY
R3 = GOAL_BONUS
R4 = DISTANCE_REWARD
TOTAL_REW = self.cumulated_reward
N_STEPS = self.counter_steps
DONE = terminated
ERRORE = self.distance_to_goal
self.dati = [TOTAL_REW, R1, R2, R3, R4, N_STEPS, DONE, ERRORE]
#self.add_row(self.dati) # comment out to save stats in csv file
return self.reward, self.info, terminated
def get_observation(self):
self.get_Pose()
self.get_Velocity()
self.get_RPY()
self.drone_position = np.array([self.x, self.y, self.z])
self.drone_velocity = np.array([self.vx, self.vy, self.vz])
self.drone_angular = np.array([self.p, self.q, self.r])
error_x = (self.random_goal[0] - self.x)/100
error_y = (self.random_goal[1] - self.y)/100
error_z = (self.random_goal[2] - self.z)/100
self.distance_to_goal = self.compute_distance_to_target(self.random_goal, self.drone_position)/100
#print(f"distance_to_goal:{self.distance_to_goal}")
goal_reached = self.is_goal_reached()
has_flipped = self.has_flipped()
if has_flipped == True:
crash = 1
else:
crash = 0
# TILT ENV OBS
obs = np.array([error_x, error_y, error_z,
self.vx, self.vy, self.vz,
self.p, self.q, self.r,
self.roll/100, self.pitch/100, self.yaw/100,
goal_reached, crash])
# NO TILT ENV OBS
'''obs = np.array([error_x, error_y, error_z,
self.vx, self.vy, self.vz,
self.roll/100, self.pitch/100, self.yaw/100,
goal_reached, crash])'''
# RANDOM YAW OBS
'''obs = np.array([error_x, error_y, error_z,
self.vx, self.vy, self.vz,
self.p, self.q, self.r,
self.roll/100, self.pitch/100, self.random_yaw - self.yaw/100,
goal_reached, crash])'''
#rospy.logerr("Observations:{}".format(np.array([obs])))
obs=torch.as_tensor(obs, dtype=torch.float32, device=self._device)
return obs
def set_quad_pose(self):
model_state_msg = ModelState()
roll = 0
pitch = 0
self.random_yaw = np.random.uniform(-np.radians(180), np.radians(180))
x,y,z,w = quaternion_from_euler(roll,pitch,self.random_yaw)
model_state_msg.model_name = 'ardrone'
model_state_msg.pose.position.x = 0.0
model_state_msg.pose.position.y = 0.0
model_state_msg.pose.position.z = 0.1
model_state_msg.pose.orientation.x = x
model_state_msg.pose.orientation.y = y
model_state_msg.pose.orientation.z = z
model_state_msg.pose.orientation.w = w
set_model_state = rospy.ServiceProxy('/gazebo/set_model_state', SetModelState)
response = set_model_state(model_state_msg)
def is_done(self):
done = False
self.counter_steps +=1
has_flipped = self.has_flipped()
if has_flipped == True:
done = True
#if self.counter_steps == LENGTH:
#self.plot()
return done
def is_goal_reached(self):
if self.distance_to_goal < 0.15:
goal_reached = 1
else:
goal_reached = 0
return goal_reached
def calculate_cost(self):
# Peso per il componente distanza
distance_weight = 1.0
if self.distance_to_goal < 0.15:
cost = 0
else:
distance_penality = distance_weight * self.distance_to_goal
cost = distance_penality
print(f"Distance to goal: {self.distance_to_goal}")
print(f"Distance penality: {distance_penality}")
#print(f"costo:{cost}")
return cost
def set_goal(self):
x_goal = np.random.uniform(GOAL_MIN_X, GOAL_MAX_X)
y_goal = np.random.uniform(GOAL_MIN_Y, GOAL_MAX_Y)
z_goal = np.random.uniform(GOAL_MIN_Z, GOAL_MAX_Z)
self.random_goal = np.array([x_goal, y_goal, z_goal])
rospy.loginfo('New Goal:{}'.format(self.random_goal))
# def set_new_waypoint(self): #QUADV2
# # SQUARE TRAJECTORY WAYPOINTS
# self.waypoints = [[0 , 0, 200], [100, -100, 200], [100, 100, 200], [-100, 100, 200], [-100, -100, 200], [0, 0, 200]]
# x_goal = self.waypoints[self.waypoint_index][0]
# y_goal = self.waypoints[self.waypoint_index][1]
# z_goal = self.waypoints[self.waypoint_index][2]
# self.random_goal = np.array([x_goal, y_goal, z_goal])
# rospy.loginfo('New Goal:{}'.format(self.random_goal))
def compute_distance_to_target(self,a,b):
distance = np.linalg.norm(a-b)
return distance
def is_inside_workspace(self):
is_inside = False
self.get_Pose()
if self.x >= MIN_X and self.x <= MAX_X:
if self.y >= MIN_Y and self.y <= MAX_Y:
if self.z >= MIN_Z and self.z<= MAX_Z:
is_inside = True
return is_inside
def has_flipped(self):
self.get_RPY()
has_flipped = False
if not(-np.radians(90)*100 <= self.roll <= np.radians(90)*100) or not(-np.radians(90)*100 <= self.pitch <= np.radians(90)*100) :
has_flipped = True
return has_flipped
def reset_variables(self):
self.counter_steps = 0
self.cumulated_reward = 0.0
self.drone_position = None
self.drone_velocity = None
self.drone_angular = None
self.distance_to_goal = None
self.reward = 0.0
self.waypoint_index = 0
def reset_motors(self):
global msg_mot, msg_motor
w1,w2,w3,w4 = self.get_MotorsVel()
while not(round(w1,2) == 0.0 and round(w2,2) == 0.0 and round(w3,2) == 0.0 and round(w4,2) == 0.0):
msg_mot.angular_velocities.clear()
msg_mot.header.stamp = rospy.Time.now()
msg_mot.angular_velocities = [0,0,0,0]
self.commands_pub.publish(msg_mot)
w1,w2,w3,w4 = self.get_MotorsVel()
velocities = np.array([w1,w2,w3,w4])
#rospy.logwarn('Motors:{}'.format(np.array([velocities])))
def make_csv(self):
rospack = rospkg.RosPack()
pkg_path = rospack.get_path('tests')
intestazioni = ['CRASH', 'GOAL', 'N_STEPS', 'DONE', 'ERRORE', 'X', 'Y', 'Z', 'VX', 'VY', 'VZ', 'X_GOAL', 'Y_GOAL', 'Z_GOAL', 'ROLL', 'PITCH', 'YAW', 'WX', 'WY', 'WZ','VX_ACTION', 'VY_ACTION', 'VZ_ACTION', 'DISTANCE_REW', 'TILT_REW', 'TOTAL_REW', 'X_REF', 'Y_REF', 'Z_REF']
self.file_csv = pkg_path + '/saved_source/CPO-{Custom1-v0}/' + self.filecsv
with open(self.file_csv, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(intestazioni)
def add_row(self, dati):
with open(self.file_csv, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow(dati)
After running the command: python3 train_policy.py --algo CPO --env-id Custom1-v0 I obtain NaN values for ep_cost as can be seen in the figure: . I have also printed the values related to the distance from the goal and the cost, but since they are not null or very small values, I don't understand where this NaN is coming from. Thank you very much in advance for your help.
ep_cost
means the total cost until the done==True
or truncated==True
. OmniSafe runs 20000 steps per epoch(steps_per_epoch
config in omnisafe/configs/on-policy/CPO.yaml
). It would be ' nan ' if your environment is not done nor truncated within 20000 steps. You can try to activate truncated within specific steps(e.g. 1000 steps in Safety Gymnasium), or simply set need_time_limit_wrapper=True
.
Sorry, I don'y understand your meaning, whatever the scale of the environment, finally, after the actor of OmniSafe, it will be changed as [-1, 1]? it can't be modified?
It seems that the function of "predict" in "gaussian_learning_actor.py" doesn't control the action value within the range you defined. But Isuppose it can be recongnized as a constraint in SRL.
Good evening, I wanted to know if it's possible to implement a callback function to save my training data. Thank you very much.
Good evening, I wanted to know if it's possible to implement a callback function to save my training data. Thank you very much.
I don't quite understand what you mean by training data
. In fact, OmniSafe's logger saves the output of the terminal during training as a CSV table in the examples/runs
folder. If you need to log additional data, consider modifying the values logged by the logger in the omnisafe/adapter/onpolicy_adapter.py
.
Good evening, I wanted to know if it's possible to implement a callback function to save my training data. Thank you very much.
I don't quite understand what you mean by
training data
. In fact, OmniSafe's logger saves the output of the terminal during training as a CSV table in theexamples/runs
folder. If you need to log additional data, consider modifying the values logged by the logger in theomnisafe/adapter/onpolicy_adapter.py
.
Hello, with my statement, I meant to ask if there is a way to save the progress of my agent during and after training. This way, I can resume training from where it was interrupted, thus avoiding long training times. Subsequently, I can use a function like agent.load(). I read that it's possible with TensorFlow using a function called checkpoint_callback. I wanted to know if it's possible to implement something similar in Omnisafe. Thank you very much.
Good evening, I wanted to know if it's possible to implement a callback function to save my training data. Thank you very much.
I don't quite understand what you mean by
training data
. In fact, OmniSafe's logger saves the output of the terminal during training as a CSV table in theexamples/runs
folder. If you need to log additional data, consider modifying the values logged by the logger in theomnisafe/adapter/onpolicy_adapter.py
.Hello, with my statement, I meant to ask if there is a way to save the progress of my agent during and after training. This way, I can resume training from where it was interrupted, thus avoiding long training times. Subsequently, I can use a function like agent.load(). I read that it's possible with TensorFlow using a function called checkpoint_callback. I wanted to know if it's possible to implement something similar in Omnisafe. Thank you very much.
Currently, OmniSafe does not support training with checkpoints. We think this is a valuable feature and will support it in the near future. As this involves many code changes, it will take a long time. We'll inform you of some possible directions first.
actor
and obs_normalizer
with a .pt
suffix.args
to the previously saved model to the examples/train_policy.py
function.reward_critic
and cost_critic
to the what_to_save
of the _init_log
function in omnisafe/algorithms/on-policy/policy_gradient.py
._init_model
function.obs_normalizer
under omnisafe/adapter/online_adapater.py
.progress.csv
.The above is only a theoretical level of advice, the actual implementation may encounter problems, welcome to keep in touch with us.
In addition, examples/collect_offline_data.py
provides a similar solution, which can be used as a reference for implementation.
Good evening, I wanted to know if it's possible to implement a callback function to save my training data. Thank you very much.
I don't quite understand what you mean by
training data
. In fact, OmniSafe's logger saves the output of the terminal during training as a CSV table in theexamples/runs
folder. If you need to log additional data, consider modifying the values logged by the logger in theomnisafe/adapter/onpolicy_adapter.py
.Hello, with my statement, I meant to ask if there is a way to save the progress of my agent during and after training. This way, I can resume training from where it was interrupted, thus avoiding long training times. Subsequently, I can use a function like agent.load(). I read that it's possible with TensorFlow using a function called checkpoint_callback. I wanted to know if it's possible to implement something similar in Omnisafe. Thank you very much.
Currently, OmniSafe does not support training with checkpoints. We think this is a valuable feature and will support it in the near future. As this involves many code changes, it will take a long time. We'll inform you of some possible directions first.
- OmniSafe keeps parameter files for
actor
andobs_normalizer
with a.pt
suffix.- add the path as
args
to the previously saved model to theexamples/train_policy.py
function.- add
reward_critic
andcost_critic
to thewhat_to_save
of the_init_log
function inomnisafe/algorithms/on-policy/policy_gradient.py
.- import the actor and critic parameters in the
_init_model
function.- import the parameters for
obs_normalizer
underomnisafe/adapter/online_adapater.py
.- restore the epoch count using the data recorded in
progress.csv
.The above is only a theoretical level of advice, the actual implementation may encounter problems, welcome to keep in touch with us. In addition,
examples/collect_offline_data.py
provides a similar solution, which can be used as a reference for implementation.
Good evening. I am encountering difficulties in saving the .pt files in the torch_save folder. Specifically, it only saves the .pt file corresponding to epoch 0, while for the remaining epochs, the corresponding .pt file is not generated. Studying the logger.py file in the common folder, it seems that the value of self._epoch is not being correctly updated. I noticed that in the Test folder you shared, even though there are multiple epochs in the progress.csv file, only the epoch-0.pt file is present in the torch_save folder. How can I generate the corresponding .pt files for each epoch? Thank you very much.
You can set the save_model_freq
to 1
in omnisafe/configs/on-policy/ALGO.yaml
to log .pt file every epoch.
You can set the
save_model_freq
to1
inomnisafe/configs/on-policy/ALGO.yaml
to log .pt file every epoch.
Good evening, after trying to reinstall Omnisafe, when I attempt to run the training, it says that there is no module called Omnisafe. Could someone help me understand and resolve the error? Thank you very much.
Rerun pip install -e.
in the root path may help. Also, please double-check that your Python environment is well set up and activated.
Rerun
pip install -e.
in the root path may help. Also, please double-check that your Python environment is well set up and activated. Okay, thank you. So, in my case, I need to run it in the path catkin_ws/src/omnisafe. Unfortunately, even rerunning the command 'pip install -e .' doesn't resolve the error.
Good morning, I would like to understand how to test my custom environment?
You can refer to tests/test_env.py
and tests/test_policy.py
for unit test examples.
Good evening, I would like to understand how I can include the number of collisions that occurred with obstacles among the saved data. In particular, I have defined in my custom environment the function that allows me to calculate the number of collisions and the obstacles with which they occur.
def check_collisions(self): obstacles = zip(self.obs_names, self.obs_positions, self.obs_dimensions) collision_count = 0 collided_obstacles = []
for obstacle_name, obstacle_position,obstacle_dimensions in obstacles:
distance_obs = self.compute_distance_to_obstacles(self.drone_position, obstacle_position,obstacle_dimensions)/100
#print(f"distance from {obstacle_name} : {distance_obs}")
if distance_obs < 0.1:
collision_count +=1
collided_obstacles.append(obstacle_name)
print(f"Collision Count: {collision_count}")
#print(f"Collided Obstacles: {collided_obstacles}")
return collision_count, collided_obstacles
Now I need to plot the total number of collisions for each epoch. I tried to modify the policy_gradient.py file, but I can't visualize the result.
Good evening, After carefully reading your patient replies to all the previous questions, I have successfully customized my own environment by running the train_policy.py script. Now, I have obtained the .pt file in the \runs...\torch_save folder. I would like to load this obtained .pt file and test it in my customized environment using a code similar to the following:
model = PPOLag.load("./xx.pt") obs = env.reset(test=True) while True:
action = model.predict(obs)
obs, rewards, cost, dones, _, info = env.step(action)
if dones:
break
I would greatly appreciate it if you could provide guidance on what exactly I need to implement in order to achieve this. Thank you in advance for your assistance.
@NoemiF99 You can modify the code concerning Omnisafe's logging of environmental parameters such as rewards and costs. You can just add an item named number_of_collisions
as epoch total reward/cost. The data will be archived in the progress.csv file, facilitating visualization.
@tjruan You can refer to the entrance file examples/evaluate_saved_policy.py
. Just use the evaluator to test it.
@NoemiF99 You can modify the code concerning Omnisafe's logging of environmental parameters such as rewards and costs. You can just add an item named
number_of_collisions
as epoch total reward/cost. The data will be archived in the progress.csv file, facilitating visualization.@tjruan You can refer to the entrance file
examples/evaluate_saved_policy.py
. Just use the evaluator to test it.
Good evening, thank you very much for responding. I would like to understand in which file I can add the number of collisions that I have calculated, so that I can append it to the progress.csv file.
@NoemiF99 You can:
step
function of your customized environment file. for example: {number_of_collisions: 10}
(10 is the supposed number of collisions)._ep_num_colli
in omnisafe/adapter/onpolicy_adapter.py
then log it from info (since you have log it in the step 1) in the method _log_value
, _log_metrics
and _reset_log
. Just following what we do to log reward/cost. Metrics/EpColi
, you can make corresponding change in policy gradient.py to log it just like how we log Metrics/EpRet
.@NoemiF99 You can modify the code concerning Omnisafe's logging of environmental parameters such as rewards and costs. You can just add an item named
number_of_collisions
as epoch total reward/cost. The data will be archived in the progress.csv file, facilitating visualization.@tjruan You can refer to the entrance file
examples/evaluate_saved_policy.py
. Just use the evaluator to test it.
Good afternoon.
Thank you so much for your patient reply earlier; it was extremely helpful.
I'm currently facing another issue. In my custom environment custom_env.py, I have set the action_space to be within the range [0, 1]. Here's the code snippet:
self._action_space = Box(low=0.0, high=1.0, shape=(1,))
However, during training, I printed out the action values and realized that they are outside this specified range. After carefully reading the discussion above, I made changes to the online_adapter.py file as follows:
self._env = ActionScale(self._env, low=0.0, high=1.0, device=self._device)
self._eval_env = ActionScale(self._eval_env, low=0.0, high=1.0, device=self._device)
Unfortunately, this modification didn't solve the problem. I sincerely appreciate any further assistance you can provide. Thank you.
Required prerequisites
Questions
Question1: I just want used our own environment, and find the omnisafe-main/tests/simple_env.py, so attempt to modify the simple_env into our own environment. However, when run the omnisafe-main/examples/train_policy.py, the simple_env couldn't be introduced?What shoule I use to train the simple_env.py?
Question2: I learn from issue #273#263#255 and the github README.md, find the following, could you give a specific environment example?It is the same with omnisafe-main/tests/simple_env? Thank you very much!