Closed lostboy233333 closed 1 year ago
Hi!
After vehicles are created (in the environment's _create_vehicles()
method), you can access these positions simply with vehicle.position
(which is a 2d numpy array).
For instance, you can do the following:
landmark.position = (vehicle1.position + vehicle2.position) / 2
Dear author I have tried your method and can achieve that the initial position of landmark is the desired position, but the landmark is always in the initial position and cannot move with the movement of vehicles. I wonder if there is any way to update the landmark position after "env.step" as well?This is the code of my environment. Can you help me find the problem? Looking forward to your reply.
from typing import Dict, Text
import numpy as np
from gym.envs.registration import register
from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.envs.common.action import Action
from highway_env.road.road import Road, RoadNetwork
from highway_env.utils import near_split
from highway_env.vehicle.controller import ControlledVehicle
from highway_env.vehicle.kinematics import Vehicle
from highway_env.vehicle.behavior import IDMVehicle
from highway_env.vehicle.objects import Landmark, Obstacle
from highway_env.envs.common.observation import MultiAgentObservation, observation_factory
from abc import abstractmethod
from gym import Env
Observation = np.ndarray
class GoalEnv(Env):
"""
Interface for A goal-based environment.
This interface is needed by agents such as Stable Baseline3's Hindsight Experience Replay (HER) agent.
It was originally part of https://github.com/openai/gym, but was later moved
to https://github.com/Farama-Foundation/gym-robotics. We cannot add gym-robotics to this project's dependencies,
since it does not have an official PyPi package, PyPi does not allow direct dependencies to git repositories.
So instead, we just reproduce the interface here.
A goal-based environment. It functions just as any regular OpenAI Gym environment but it
imposes a required structure on the observation_space. More concretely, the observation
space is required to contain at least three elements, namely `observation`, `desired_goal`, and
`achieved_goal`. Here, `desired_goal` specifies the goal that the agent should attempt to achieve.
`achieved_goal` is the goal that it currently achieved instead. `observation` contains the
actual observations of the environment as per usual.
"""
@abstractmethod
def compute_reward(self, achieved_goal: np.ndarray, desired_goal: np.ndarray, info: dict) -> float:
"""Compute the step reward. This externalizes the reward function and makes
it dependent on a desired goal and the one that was achieved. If you wish to include
additional rewards that are independent of the goal, you can include the necessary values
to derive it in 'info' and compute it accordingly.
Args:
achieved_goal (object): the goal that was achieved during execution
desired_goal (object): the desired goal that we asked the agent to attempt to achieve
info (dict): an info dictionary with additional information
Returns:
float: The reward that corresponds to the provided achieved goal w.r.t. to the desired
goal. Note that the following should always hold true:
ob, reward, done, info = env.step()
assert reward == env.compute_reward(ob['achieved_goal'], ob['desired_goal'], info)
"""
raise NotImplementedError
class HighwayEnv01(AbstractEnv, GoalEnv):
"""
A highway driving environment.
The vehicle is driving on a straight highway with several lanes, and is rewarded for reaching a high speed,
staying on the rightmost lanes and avoiding collisions.
"""
PARKING_OBS = {"observation": {
"type": "KinematicsGoal",
"features": ['x', 'y', 'vx', 'vy', 'cos_h', 'sin_h'],
"scales": [100, 100, 5, 5, 1, 1],
"normalize": False
}}
def __init__(self, config: dict = None) -> None:
super().__init__(config)
self.observation_type_parking = None
@classmethod
def default_config(cls) -> dict:
config = super().default_config()
config.update({
"observation": {
"type": "Kinematics",
"features": ['x', 'y', 'vx', 'vy', 'cos_h', 'sin_h'],
"scales": [100, 100, 5, 5, 1, 1],
"normalize": False,
"absolute": True
},
"action": {
"type": "DiscreteMetaAction"
},
"reward_weights": [1, 0.3, 1, 0.3, 0.02, 0.02], #
"success_goal_reward": 0.12,
"collision_reward": -500,
"lanes_count": 4,
"vehicles_count": 5,
"controlled_vehicles": 1,
"initial_lane_id": None,
"duration": 40, # [s]
"ego_spacing": 2,
"vehicles_density": 1,
"normalize_reward": True,
"offroad_terminal": True,
"screen_width": 800,
"screen_height": 600,
"simulation_frequency": 15,
"policy_frequency": 5,
"absolute": False
})
return config
def define_spaces(self) -> None:
"""
Set the types and spaces of observation and action from config.
"""
super().define_spaces()
self.observation_type_parking = observation_factory(self, self.PARKING_OBS["observation"])
def _info(self, obs, action) -> dict:
info = super(HighwayEnv01, self)._info(obs, action)
if isinstance(self.observation_type, MultiAgentObservation):
success = tuple(self._is_success(agent_obs['achieved_goal'], agent_obs['desired_goal']) for agent_obs in obs)
else:
obs = self.observation_type_parking.observe()
success = self._is_success(obs['achieved_goal'], obs['desired_goal'])
info.update({"is_success": success})
return info
def _reset(self) -> None:
self._create_road()
self._create_vehicles()
def _create_road(self) -> None:
"""Create a road composed of straight adjacent lanes."""
self.road = Road(network=RoadNetwork.straight_road_network(self.config["lanes_count"], speed_limit=30,nodes_str=("a","b")),
np_random=self.np_random, record_history=self.config["show_trajectories"])
def _create_vehicles(self) -> None:
"""Create some new random vehicles of a given type, and add them on the road."""
# Controlled vehicles
self.controlled_vehicles = []
for i in range(self.config["controlled_vehicles"]):
vehicle = Vehicle.create_random(
self.road,
speed= 1,
lane_id= 2,
spacing=self.config["ego_spacing"]
)
vehicle = self.action_type.vehicle_class(self.road, vehicle.position, vehicle.heading, vehicle.speed)
vehicle.color = (200, 0, 150)
self.controlled_vehicles.append(vehicle)
self.road.vehicles.append(vehicle)
# platoon
# car 1
vehicle1 = Vehicle.create_random(self.road, speed=1, lane_id=1, spacing=1)
vehicle1 = self.action_type.vehicle_class(
self.road,
vehicle1.position,
vehicle1.heading,
vehicle1.speed,
)
self.vehicle = vehicle1
self.road.vehicles.append(vehicle1)
# car 2
vehicle2 = Vehicle.create_random(self.road, speed=1, lane_id=1, spacing=1)
vehicle2 = self.action_type.vehicle_class(
self.road,
vehicle2.position,
vehicle2.heading,
vehicle2.speed,
)
self.road.vehicles.append(vehicle2)
# car 3
vehicle3 = Vehicle.create_random(self.road, speed=1, lane_id=1, spacing=1)
vehicle3 = self.action_type.vehicle_class(
self.road,
vehicle3.position,
vehicle3.heading,
vehicle3.speed,
)
self.road.vehicles.append(vehicle3)
# car 4
vehicle4 = Vehicle.create_random(self.road, speed=1, lane_id=1, spacing=2)
vehicle4 = self.action_type.vehicle_class(
self.road,
vehicle4.position,
vehicle4.heading,
vehicle4.speed,
)
self.road.vehicles.append(vehicle4)
# goal
lane = self.np_random.choice(self.road.network.lanes_list())
self.goal = Landmark(self.road, (vehicle3.position+vehicle4.position)/2, heading=lane.heading, speed=1)
self.road.objects.append(self.goal)
def compute_reward(self, achieved_goal: np.ndarray, desired_goal: np.ndarray, info: dict, p: float = 0.5) -> float:
"""
Proximity to the goal is rewarded
We use a weighted p-norm
:param achieved_goal: the goal that was achieved
:param desired_goal: the goal that was desired
:param dict info: any supplementary information
:param p: the Lp^p norm used in the reward. Use p<1 to have high kurtosis for rewards in [0, 1]
:return: the corresponding reward
"""
return -np.power(np.dot(np.abs(achieved_goal - desired_goal), np.array(self.config["reward_weights"])), p)
def _reward(self, action: np.ndarray) -> float:
obs = self.observation_type_parking.observe()
obs = obs if isinstance(obs, tuple) else (obs,)
reward = sum(self.compute_reward(agent_obs['achieved_goal'], agent_obs['desired_goal'], {}) for agent_obs in obs)
reward += self.config['collision_reward'] * sum(v.crashed for v in self.controlled_vehicles)
return reward
def _is_success(self, achieved_goal: np.ndarray, desired_goal: np.ndarray) -> bool:
return self.compute_reward(achieved_goal, desired_goal, {}) > -self.config["success_goal_reward"]
def _is_terminated(self) -> bool:
"""The episode is over if the ego vehicle crashed."""
return self.vehicle.crashed or \
(self.config["offroad_terminal"] and not self.vehicle.on_road)
def _is_truncated(self) -> bool:
"""The episode is over if the ego vehicle crashed or the time is out."""
return self.time >= self.config["duration"]
class HighwayEnvFast(HighwayEnv01):
"""
A variant of highway-v0 with faster execution:
- lower simulation frequency
- fewer vehicles in the scene (and fewer lanes, shorter episode duration)
- only check collision of controlled vehicles with others
"""
@classmethod
def default_config(cls) -> dict:
cfg = super().default_config()
cfg.update({
"simulation_frequency": 5,
"lanes_count": 3,
"vehicles_count": 20,
"duration": 30, # [s]
"ego_spacing": 1.5,
})
return cfg
def _create_vehicles(self) -> None:
super()._create_vehicles()
# Disable collision check for uncontrolled vehicles
for vehicle in self.road.vehicles:
if vehicle not in self.controlled_vehicles:
vehicle.check_collisions = False
register(
id='highway_env_1',
entry_point='highway_env.envs:HighwayEnv01',
)
register(
id='highway-fast-v0',
entry_point='highway_env.envs:HighwayEnvFast',
)
'''
Hi, Yes if you want to update the landmark's position, you should do it everytime the env.step function is called. And ideally, you actually want to do it after we ran the dynamics and updated the vehicles' positions, but before we compute rewards, observations etc., so here:
The easiest you can do is probably to override the _simulate()
method in your environment as follows:
def _simulate(self, action):
super()._simulate(action)
# also simulate the landmark motion
self.landmark.position = (self.vehicle3.position+self.vehicle4.position)/2
Of course, you would have to also edit _create_vehicles()
to save
self.vehicle3 = vehicle3
self.vehicle4 = vehicle4
My problem solved! Thank you for your patience.
Dear author I want to produce a landmark that changes with the state of the vehicle, but I don't know how to get the current location of the vehicle when defining the environment. In the image below, I want the landmark to always appear between car 3 and car 4, but I don't know how to get the real-time location of car 3 and car 4。Could you tell me what to do? Looking forward to your reply.