Farama-Foundation / HighwayEnv

A minimalist environment for decision-making in autonomous driving
https://highway-env.farama.org/
MIT License
2.57k stars 732 forks source link

Custom Actionspace return None #538

Closed seominseok00 closed 9 months ago

seominseok00 commented 9 months ago

image

http://highway-env.farama.org/dynamics/vehicle/behavior/#longitudinal-behavior

I created a Custom Action Space to optimize the parameters $v_0$ (desired velocity) and $a$ (maximum acceleration) of an IDM model using reinforcement learning, but it keeps returning None for actions.

Can you check if there's anything I've done wrong?

  1. Definition of Custom Action Space:

highway_env/envs/common/action.py

class IDMAction(ActionType):

    """
    An continuous action space for maximum acceleration and/or desired velocity.

    If both maximum acceleration and desired velocity are enabled, they are set in this order: [COMFORT_ACC_MAX, target_velocity]

    The space intervals are always [-1, 1], but are mapped to maximum acceleration/desired velocity intervals through configurations.
    """

    ACCELERATION_RANGE = (-5, 3.0)
    """Maximum acceleration range: [-x, x], in m/s²."""

    VELOCITY_RANGE = (1, 10)
    """Desired velocity range: [-x, x], in m/s."""

    def __init__(self,
                 env: 'AbstractEnv',
                 acceleration_range: Optional[Tuple[float, float]] = None,
                 velocity_range: Optional[Tuple[float, float]] = None,
                 speed_range: Optional[Tuple[float, float]] = None,
                 COMFORT_ACC_MAX: bool = True,
                 target_velocity: bool = True,
                 dynamical: bool = False,
                 clip: bool = True,
                 **kwargs) -> None:
        """
        Create a continuous action space.

        :param env: the environment
        :param acceleration_range: the range of maximum acceleration values [m/s²]
        :param velocity_range: the range of desired velocity values [m/s]
        :param speed_range: the range of reachable speeds [m/s]
        :param COMFORT_ACC_MAX: enable maximum acceleration control
        :param target_velocity: enable desired velocity control
        :param dynamical: whether to simulate dynamics (i.e. friction) rather than kinematics
        :param clip: clip action to the defined range
        """
        super().__init__(env)
        self.acceleration_range = acceleration_range if acceleration_range else self.ACCELERATION_RANGE
        self.velocity_range = velocity_range if velocity_range else self.VELOCITY_RANGE
        self.speed_range = speed_range
        self.COMFORT_ACC_MAX = COMFORT_ACC_MAX
        self.target_velocity = target_velocity
        if not self.COMFORT_ACC_MAX and not self.target_velocity:
            raise ValueError("Either COMFORT_ACC_MAX and/or target_velocity control must be enabled")
        self.dynamical = dynamical
        self.clip = clip
        self.size = 2 if self.COMFORT_ACC_MAX and self.target_velocity else 1
        self.last_action = np.zeros(self.size)

    def space(self) -> spaces.Box:
        return spaces.Box(-1., 1., shape=(self.size,), dtype=np.float32)

    @property
    def vehicle_class(self) -> Callable:
        return IDMMDPVehicle

    def get_action(self, action: np.ndarray):
        if self.clip:
            action = np.clip(action, -1, 1)
        if self.speed_range:
            (self.controlled_vehicle.MIN_SPEED, self.controlled_vehicle.MAX_SPEED) = self.speed_range
        if self.COMFORT_ACC_MAX and self.target_velocity:
            return {
                "COMFORT_ACC_MAX": utils.lmap(action[0], [-1, 1], self.acceleration_range),
                "target_velocity": utils.lmap(action[1], [-1, 1], self.velocity_range)
            }
        elif self.COMFORT_ACC_MAX:
            return {
                "COMFORT_ACC_MAX": utils.lmap(action[0], [-1, 1], self.acceleration_range),
                "target_velocity": 0,
            }
        elif self.target_velocity:
            return {
                "COMFORT_ACC_MAX": 0,
                "target_velocity": utils.lmap(action[1], [-1, 1], self.velocity_range)
            }

    def act(self, action: np.ndarray) -> None:
        self.controlled_vehicle.act(self.get_action(action))
        self.last_action = action

def action_factory(env: 'AbstractEnv', config: dict) -> ActionType:
    if config["type"] == "ContinuousAction":
        return ContinuousAction(env, **config)
    elif config["type"] == "IDMAction":
        return IDMAction(env, **config)
    if config["type"] == "DiscreteAction":
        return DiscreteAction(env, **config)
    elif config["type"] == "DiscreteMetaAction":
        return DiscreteMetaAction(env, **config)
    elif config["type"] == "MultiAgentAction":
        return MultiAgentAction(env, **config)
    else:
        raise ValueError("Unknown action type")

I referenced ContinuousAction and defined it as above, and also registered it in the action_factory.

  1. Definition of the vehicle:

highway_env/vehicle/controller.py

class IDMMDPVehicle(ControlledVehicle):
    """
    A vehicle using both a longitudinal and a lateral decision policies.

    - Longitudinal: the IDM model computes an acceleration given the preceding vehicle's distance and speed.
    - Lateral: the MOBIL model decides when to change lane by maximizing the acceleration of nearby vehicles.
    """

    # Longitudinal policy parameters
    ACC_MAX = 6.0  # [m/s2]
    """Maximum acceleration."""

    COMFORT_ACC_MAX = 3.0  # [m/s2]
    """Desired maximum acceleration."""

    COMFORT_ACC_MIN = -5.0  # [m/s2]
    """Desired maximum deceleration."""

    DISTANCE_WANTED = 5.0 + ControlledVehicle.LENGTH  # [m]
    """Desired jam distance to the front vehicle."""

    TIME_WANTED = 1.5  # [s]
    """Desired time gap to the front vehicle."""

    DELTA = 4.0  # []
    """Exponent of the velocity term."""

    DELTA_RANGE = [3.5, 4.5]
    """Range of delta when chosen randomly."""

    # Lateral policy parameters
    POLITENESS = 0.  # in [0, 1]
    LANE_CHANGE_MIN_ACC_GAIN = 0.2  # [m/s2]
    LANE_CHANGE_MAX_BRAKING_IMPOSED = 2.0  # [m/s2]
    LANE_CHANGE_DELAY = 1.0  # [s]

    def __init__(self,
                 road: Road,
                 position: Vector,
                 heading: float = 0,
                 speed: float = 0,
                 target_lane_index: int = None,
                 target_speed: float = None,
                 route: Route = None,
                 enable_lane_change: bool = True,
                 timer: float = None):
        super().__init__(road, position, heading, speed, target_lane_index, target_speed, route)
        self.enable_lane_change = enable_lane_change
        self.timer = timer or (np.sum(self.position)*np.pi) % self.LANE_CHANGE_DELAY

    def act(self, action: Union[dict, str] = None) -> None:
        """
        Perform a high-level action to change the desired lane or speed.

        - If a high-level action is provided, update the target speed and lane;
        - then, perform longitudinal and lateral control.

        :param action: a high-level action
        """
        print('action: ', action)

        # Lateral: MOBIL
        self.follow_road()

        # Longitudinal: IDM
        front_vehicle, rear_vehicle = self.road.neighbour_vehicles(self, self.lane_index)
        acceleration = self.acceleration(ego_vehicle=self,
                                                   front_vehicle=front_vehicle,
                                                   rear_vehicle=rear_vehicle,
                                                   COMFORT_ACC_MAX=action["COMFORT_ACC_MAX"],
                                                   target_velocity=action["target_velocity"])

        action = {}

        # action['acceleration'] = self.recover_from_stop(action['acceleration'])
        action['acceleration'] = acceleration
        action['acceleration'] = np.clip(action['acceleration'], -self.ACC_MAX, self.ACC_MAX)

        action["steering"] = self.steering_control(self.target_lane_index)
        action['steering'] = np.clip(action['steering'], -self.MAX_STEERING_ANGLE, self.MAX_STEERING_ANGLE)

        Vehicle.act(self, action)  # Skip ControlledVehicle.act(), or the command will be overriden.

    def step(self, dt: float):
        """
        Step the simulation.

        Increases a timer used for decision policies, and step the vehicle dynamics.

        :param dt: timestep
        """
        self.timer += dt
        super().step(dt)

    def acceleration(self,
                     ego_vehicle: ControlledVehicle,
                     front_vehicle: Vehicle = None,
                     rear_vehicle: Vehicle = None,
                     COMFORT_ACC_MAX: float = 3.0,
                     target_velocity: float = 10.0) -> float:
        """
        Compute an acceleration command with the Intelligent Driver Model.

        The acceleration is chosen so as to:
        - reach a target speed;
        - maintain a minimum safety distance (and safety time) w.r.t the front vehicle.

        :param ego_vehicle: the vehicle whose desired acceleration is to be computed. It does not have to be an
                            IDM vehicle, which is why this method is a class method. This allows an IDM vehicle to
                            reason about other vehicles behaviors even though they may not IDMs.
        :param front_vehicle: the vehicle preceding the ego-vehicle
        :param rear_vehicle: the vehicle following the ego-vehicle
        :return: the acceleration command for the ego-vehicle [m/s2]
        """
        print('COMFORT_ACC_MAX: {}, target_velocity: {}'.format(COMFORT_ACC_MAX, target_velocity))

        if not ego_vehicle or not isinstance(ego_vehicle, IDMMDPVehicle):
            return 0
        ego_target_speed = max(target_velocity, 0.0)
        if ego_vehicle.lane and ego_vehicle.lane.speed_limit is not None:
            ego_target_speed = np.clip(ego_target_speed, 0, ego_vehicle.lane.speed_limit)

        # a[1-(v/v_0)^δ]
        acceleration = COMFORT_ACC_MAX * (1 - np.power(
            max(ego_vehicle.speed, 0) / abs(utils.not_zero(ego_target_speed)), self.DELTA))

        # a[-(d^*/d)^2]
        if front_vehicle:
            d = ego_vehicle.lane_distance_to(front_vehicle)
            acceleration -= COMFORT_ACC_MAX * \
                np.power(self.desired_gap(ego_vehicle=ego_vehicle, front_vehicle=front_vehicle, COMFORT_ACC_MAX=COMFORT_ACC_MAX) / utils.not_zero(d), 2)
        return acceleration

    def desired_gap(self, ego_vehicle: Vehicle, front_vehicle: Vehicle = None, projected: bool = True, COMFORT_ACC_MAX: float = 3.0) -> float:
        """
        Compute the desired distance between a vehicle and its leading vehicle.

        :param ego_vehicle: the vehicle being controlled
        :param front_vehicle: its leading vehicle
        :param projected: project 2D velocities in 1D space
        :return: the desired distance between the two [m]
        """
        print('COMFORT_ACC_MAX: {}'.format(COMFORT_ACC_MAX))
        d0 = self.DISTANCE_WANTED
        tau = self.TIME_WANTED
        ab = -COMFORT_ACC_MAX * self.COMFORT_ACC_MIN if COMFORT_ACC_MAX > 0 else COMFORT_ACC_MAX * self.COMFORT_ACC_MIN
        dv = np.dot(ego_vehicle.velocity - front_vehicle.velocity, ego_vehicle.direction) if projected \
            else ego_vehicle.speed - front_vehicle.speed
        d_star = d0 + ego_vehicle.speed * tau + ego_vehicle.speed * dv / (2 * np.sqrt(ab))
        return d_star

I used the methods from IDMVehicle as mentioned above and modified it to receive COMFORT_ACC_MAX and target_velocity determined through the action.

For training and testing, I used the PPO algorithm from stable baselines3. When I run it, I encounter the following error:

import sys
sys.path.append('/Users/seominseok/content2/highway-env')

import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from stable_baselines3 import PPO

import highway_env
highway_env.register_highway_envs()

from highway_env import *

TRAIN = True

if __name__ == "__main__":
    # Create the environment
    env = gym.make("intersection-merge-v0", render_mode="rgb_array")
    obs, info = env.reset()

    n_cpu = 6
    batch_size = 64

    # Create the model
    model = PPO(
        "MlpPolicy",
        env,
        policy_kwargs=dict(net_arch=[dict(pi=[256, 256], vf=[256, 256])]),
        n_steps=batch_size * 12 // n_cpu,
        batch_size=batch_size,
        n_epochs=10,    
        learning_rate=5e-4,
        gamma=0.8,
        verbose=2,
        tensorboard_log="highway_ppo/",
    )

    # Train the model
    if TRAIN:
        model.learn(total_timesteps=int(100))
        model.save("highway_ppo/model")
        del model

    # Run the trained model and record video
    model = PPO.load("highway_ppo/model", env=env)
    env = RecordVideo(
        env, video_folder="highway_ppo/videos", episode_trigger=lambda e: True
    )
    env.unwrapped.set_record_video_wrapper(env)
    env.configure({"simulation_frequency": 15})  # Higher FPS for rendering

    for videos in range(10):
        done = truncated = False
        obs, info = env.reset()
        while not (done or truncated):
            # Predict
            action, _states = model.predict(obs, deterministic=True)
            print('action: ', action)
            # Get reward
            obs, reward, done, truncated, info = env.step(action)
            # Render
            env.render()
    env.close()
Traceback (most recent call last):
  File "c:/Users/seominseok/content2/train_sb3.py", line 40, in <module>
    model.learn(total_timesteps=int(100))
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\stable_baselines3\ppo\ppo.py", line 315, in learn
    return super().learn(
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\stable_baselines3\common\on_policy_algorithm.py", line 277, in learn
    continue_training = self.collect_rollouts(self.env, callback, self.rollout_buffer, n_rollout_steps=self.n_steps)
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\stable_baselines3\common\on_policy_algorithm.py", line 194, in collect_rollouts
    new_obs, rewards, dones, infos = env.step(clipped_actions)
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\stable_baselines3\common\vec_env\base_vec_env.py", line 206, in step
    return self.step_wait()
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\stable_baselines3\common\vec_env\dummy_vec_env.py", line 58, in step_wait
    obs, self.buf_rews[env_idx], terminated, truncated, self.buf_infos[env_idx] = self.envs[env_idx].step(
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\stable_baselines3\common\monitor.py", line 94, in step
    observation, reward, terminated, truncated, info = self.env.step(action)
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\gymnasium\wrappers\order_enforcing.py", line 56, in step
    return self.env.step(action)
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\gymnasium\wrappers\env_checker.py", line 47, in step
    return env_step_passive_checker(self.env, action)
  File "C:\Users\seominseok\anaconda3\envs\sb3\lib\site-packages\gymnasium\utils\passive_env_checker.py", line 237, in env_step_passive_checker
    result = env.step(action)
  File "C:\Users/seominseok/content2/highway-env\highway_env\envs\intersection_merge_env.py", line 196, in step
    obs, reward, terminated, truncated, info = super().step(action)
  File "C:\Users/seominseok/content2/highway-env\highway_env\envs\common\abstract.py", line 237, in step
    self._simulate(action)
  File "C:\Users/seominseok/content2/highway-env\highway_env\envs\common\abstract.py", line 259, in _simulate
    self.road.act()
  File "C:\Users/seominseok/content2/highway-env\highway_env\road\road.py", line 355, in act
    vehicle.act()
  File "C:\Users/seominseok/content2/highway-env\highway_env\vehicle\controller.py", line 390, in act
    COMFORT_ACC_MAX=action["COMFORT_ACC_MAX"],
TypeError: 'NoneType' object is not subscriptable

What could be the problem? Thanks in advance for the response.

eleurent commented 9 months ago

The reason is the following: in general, the policy frequency is not the same as the simulation frequency. Typically, the agent will take high-level decision at a low frequency (e.g. changing the velocity setpoint, the target lane being followed, etc.), and then the vehicle will take low-level actions at a higher (simulation) frequency to execute the high-level actions.

At a low policy-frequency (e.g. 1Hz), the agent gets to call vehicle.act(action) with some action input.

And then during every simulation step (e.g. at 10Hz), we call for each vehicle:

vehicle.act()
vehicle.step()

where vehicle.act() will involve adjusting the acceleration and steering commands, and vehicle.step() integrates the dynamics.

So all you have to do is ensure that your IDMMDPVehicle handles vehicle.act() calls (which is the same as vehicle.act(action=None)) by storing the previous high-level action internally, such that it is reused at subsequent simulation steps. Something like:

    def act(self, action: Union[dict, str] = None) -> None:
        """
        Perform a high-level action to change the desired lane or speed.

        - If a high-level action is provided, update the target speed and lane;
        - then, perform longitudinal and lateral control.

        :param action: a high-level action
        """
        if action:
           self.last_action = action

        # Lateral: MOBIL
        self.follow_road()

        # Longitudinal: IDM
        front_vehicle, rear_vehicle = self.road.neighbour_vehicles(self, self.lane_index)
        acceleration = self.acceleration(ego_vehicle=self,
                                                   front_vehicle=front_vehicle,
                                                   rear_vehicle=rear_vehicle,
                                                   COMFORT_ACC_MAX=self.last_action["COMFORT_ACC_MAX"],
                                                   target_velocity=self.last_action["target_velocity"])

        action = {}

        # action['acceleration'] = self.recover_from_stop(action['acceleration'])
        action['acceleration'] = acceleration
        action['acceleration'] = np.clip(action['acceleration'], -self.ACC_MAX, self.ACC_MAX)

        action["steering"] = self.steering_control(self.target_lane_index)
        action['steering'] = np.clip(action['steering'], -self.MAX_STEERING_ANGLE, self.MAX_STEERING_ANGLE)

        Vehicle.act(self, action)  # Skip ControlledVehicle.act(), or the command will be overriden.
seominseok00 commented 9 months ago

I just checked now. It seems to be working well.

Thank you.