Farama-Foundation / Minigrid

Simple and easily configurable grid world environments for reinforcement learning
https://minigrid.farama.org/
Other
2.09k stars 604 forks source link

[Question] Warning raised with custom environment: The obs returned by the `reset()` method is not within the observation space #298

Closed JustinS6626 closed 1 year ago

JustinS6626 commented 1 year ago

Question

I am working on training a deep RL agent on a custom minigrid with randomly placed lava obstacles, and I have received the following warning in the debug log:

WARNING:py.warnings:/usr/local/lib/python3.8/dist-packages/gymnasium/utils/passive_env_checker.py:165: UserWarning: WARN: The obs returned by the reset() method is not within the observation space. logger.warn(f"{pre} is not within the observation space.")

I am concerned that this might be causing problems for the training process, and I am wondering how I might be able to fix it.

This is the code for my custom environment:

import numpy as np
from minigrid.core.grid import Grid
from minigrid.core.mission import MissionSpace
from minigrid.core.world_object import Goal, Lava, Wall
from minigrid.minigrid_env import MiniGridEnv

def array_in_list(a, l):
    contained = False
    for item in l:
        if (a == item).all():
            contained = True
    return contained

class RandomLavaEnv(MiniGridEnv):
    def __init__(self, size, n_obstacles,
                 obstacle_type=Lava,
                 max_steps=None, **kwargs):
        self.obstacle_type = obstacle_type
        self.obstacles = []
        self.lava_list = []
        self.size = size
        self.goal_pos = np.array((self.size - 2, self.size - 2))

        if obstacle_type == Lava:
            mission_space = MissionSpace(mission_func=self._gen_mission_lava)
        else:
            mission_space = MissionSpace(mission_func=self._gen_mission)

        if max_steps is None:
            max_steps = 4 * size**2
        self.n_obstacles = n_obstacles
##        if n_obstacles != self.size:
##            self.n_obstacles = int(self.size)
##        else:
##            self.n_obstacles = n_obstacles

        super().__init__(mission_space=mission_space,
                         grid_size=size,
                         see_through_walls=False,
                         max_steps=max_steps,
                         **kwargs)
        self.reward_range=(-1, 1)

    @staticmethod
    def _gen_mission_lava():
        return "avoid lava and get to the green goal square"

    @staticmethod
    def _gen_mission():
        return "find the opening and get to the green goal square"

    def _gen_grid(self, width, height):
        assert width >= 5 and height >= 5
        self.grid = Grid(width, height)
        self.grid.wall_rect(0, 0, width, height)
        self.grid.set(width - 2, height - 2, Goal())
        self.agent_pos = np.array([1, 1])
        self.agent_dir = 0

        for i_obst in range(self.n_obstacles):
            self.obstacles.append(Lava())
            #self.place_obj(self.obstacles[i_obst], max_tries=100)
        retry = True
        while retry:
            for i_obst in range(self.n_obstacles):
                lava_x = self._rand_int(1, width - 2)
                if lava_x == 1:
                    lava_y = self._rand_int(2, height - 2)
                elif lava_x == width - 2:
                    lava_y = self._rand_int(1, height - 3)
                else:
                    lava_y = self._rand_int(1, height - 1)
                lava_spot = np.array([lava_y, lava_x])
                self.lava_list.append(lava_spot)
                valid_map = self.path_check(np.array(self.agent_pos), self.lava_list, [])
                if valid_map:
                    retry = False

        for i in range(self.n_obstacles):
            spot = self.lava_list[i]
            self.put_obj(self.obstacles[i], *spot)

            #self.put_obj(self.obstacles[i_obst], *lava_spot)
        self.mission = (
            "avoid the lava and get to the green goal square"
            if self.obstacle_type == Lava
            else "find the opening and get to the green goal square"
            )

    def path_check(self, coordinates, obstacles, visited):
        #print(coordinates)
        #print(visited)
        if array_in_list(coordinates, visited) or array_in_list(coordinates, obstacles):
            return False
        elif (coordinates == self.goal_pos).all():
            print("Found")
            return True
        else:
            visited.append(coordinates)
            if coordinates[0] == 1:
                if coordinates[1] == 1:
                    return self.path_check(np.array([coordinates[0] + 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] + 1]), obstacles, visited)
                elif coordinates[1] == self.size - 2:
                    return self.path_check(np.array([coordinates[0] + 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array((coordinates[0], coordinates[1] - 1)), obstacles, visited)
                else:
                    return self.path_check(np.array([coordinates[0] + 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] - 1]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] + 1]), obstacles, visited)
            elif coordinates[0] == self.size - 2:
                if coordinates[1] == 1:
                    return self.path_check(np.array([coordinates[0] - 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] + 1]), obstacles, visited)
                else:
                    return self.path_check(np.array([coordinates[0] - 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] - 1]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] + 1]), obstacles, visited)
            else:
                if coordinates[1] == 1:
                    return self.path_check(np.array([coordinates[0] + 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0] - 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] + 1]), obstacles, visited)
                elif coordinates[1] == self.size - 2:
                    return self.path_check(np.array([coordinates[0] + 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0] - 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] - 1]), obstacles, visited)
                else:
                    return self.path_check(np.array([coordinates[0] + 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0] - 1, coordinates[1]]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] + 1]), obstacles, visited) or\
                           self.path_check(np.array([coordinates[0], coordinates[1] - 1]), obstacles, visited)

    def step(self, action):
        obs, reward, terminated, truncated, info = super().step(action)
        if action == self.actions.forward and not_clear:
            reward = -1
            terminated = True
            return obs, reward, terminated, truncated, info
        return obs, reward, terminated, truncated, info
pseudo-rnd-thoughts commented 1 year ago

@DarthMalloc Apologies for not replying this must have got lost in my notifications. Were you able to solve this?