Closed Steven7099 closed 2 years ago
Got it working!
import sys
from controller import Supervisor
import gym
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import os
class OpenAIGymEnvironment(Supervisor, gym.Env):
def __init__(self, max_episode_steps=1000):
super().__init__()
self.angel = 0
self.Right_steer = None
self.Left_steer = None
self.right_turn = None
self.left_turn = None
self.Side_right_sensor = None
self.Side_left_sensor = None
self.Front_right_sensor = None
self.Front_center_sensor = None
self.Front_left_sensor = None
self.side_right_sensor = None
self.side_left_sensor = None
self.front_right_sensor = None
self.front_left_sensor = None
self.front_center_sensor = None
low = np.array([-0.35, -0.35, 0, 0, 0, 0, 0])
high = np.array([0.35, 0.35, 1000, 1000, 1000, 1000, 1000])
# Open AI Gym generic
self.action_space = gym.spaces.Discrete(3)
self.observation_space = gym.spaces.Box(low, high, dtype=np.float32)
self.state = None
self.spec = gym.envs.registration.EnvSpec(id='WebotsEnv-v0', max_episode_steps=max_episode_steps)
# Environment specific
self.__timestep = int(self.getBasicTimeStep())
self.__wheels = []
def reset(self):
# Reset the simulation
self.simulationResetPhysics()
self.simulationReset()
super().step(self.__timestep)
# Steering names
# self.__wheels = []
# Set steering.
self.left_turn = self.getDevice('left_steer')
self.right_turn = self.getDevice('right_steer')
self.right_turn.setPosition(0)
self.left_turn.setPosition(0)
# self.__wheels.append(self.right_turn)
# self.__wheels.append(self.left_turn)
# set motor speed
for name in ['left_rear_wheel', 'right_rear_wheel', 'left_front_wheel', 'right_front_wheel']:
tire = self.getDevice(name)
tire.setPosition(float('inf'))
tire.setVelocity(5)
# Sensors
self.front_center_sensor = self.getDevice('front_center_sensor')
self.front_left_sensor = self.getDevice('front_left_sensor')
self.front_right_sensor = self.getDevice('front_right_sensor')
self.side_left_sensor = self.getDevice('side_left_sensor')
self.side_right_sensor = self.getDevice('side_right_sensor')
self.front_center_sensor.enable(self.__timestep)
self.front_left_sensor.enable(self.__timestep)
self.front_right_sensor.enable(self.__timestep)
self.side_left_sensor.enable(self.__timestep)
self.side_right_sensor.enable(self.__timestep)
# Internals
super().step(self.__timestep)
# Open AI Gym generic
# Could be 2-3 zero's
return np.array([0, 0, 0, 0, 0, 0, 0]).astype(np.float32)
def step(self, action):
# Execute the action
# Steer Left
if action == 0:
self.angel = -0.35
# Steer Right
elif action == 2:
self.angel = 0.35
# Steer Center
else:
self.angel = 0
self.right_turn.setPosition(self.angel)
self.left_turn.setPosition(self.angel)
super().step(self.__timestep)
# Observation
# Assign front sensor values.
self.Front_left_sensor = self.front_left_sensor.getValue()
self.Front_center_sensor = self.front_center_sensor.getValue()
self.Front_right_sensor = self.front_right_sensor.getValue()
# Assign side sensor values.
self.Side_left_sensor = self.side_left_sensor.getValue()
self.Side_right_sensor = self.side_right_sensor.getValue()
self.state = np.array([self.angel, self.angel, self.Side_right_sensor,
self.Side_left_sensor, self.Front_right_sensor,
self.Front_left_sensor, self.Front_center_sensor])
done = bool(
self.state[2] > 950 or
self.state[3] > 950 or
self.state[4] > 950 or
self.state[5] > 950 or
self.state[6] > 950
)
# Reward
reward = 0 if done else 1
return self.state.astype(np.float32), reward, done, {}
def main():
# Initialize the environment
env = OpenAIGymEnvironment()
check_env(env)
models_dir = "models/PPO"
logdir = "logs"
if not os.path.exists(models_dir):
os.makedirs(models_dir)
if not os.path.exists(logdir):
os.makedirs(logdir)
# Train
model = PPO('MlpPolicy', env, n_steps=2048, verbose=1)
# model.learn(total_timesteps=1e5)
# model = PPO.load()
TIMESTEPS = 10000
iters = 0
for i in range(30):
model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name="PPO")
model.save(f"{models_dir}/{TIMESTEPS * i}")
obs = env.reset()
for i in range(10000):
action, _states = model.predict(obs)
obs, reward, done, info = env.step(action)
print(obs, reward, done, info)
if done:
obs = env.reset()
if __name__ == '__main__':
main()
Nice. Do you think it could be useful to improve the provided example?
Following the OpenAI Gym example and trying to implement it on the default Altino car. https://github.com/cyberbotics/webots/blob/master/projects/samples/howto/openai_gym/controllers/openai_gym/openai_gym.py
I keep getting
check_obsassert observation_space.contains(AssertionError: The observation returned by the `reset()` method does not match the given observation space
My goal is to use the 3 front sensors and 2 side sensors to teach the car to not hit the walls when going down a racetrack.
I'm new to the OpenAI gym and hope I can make another Webots example for the Altino car along with my troubles.