openai / gym

A toolkit for developing and comparing reinforcement learning algorithms.
https://www.gymlibrary.dev
Other
34.5k stars 8.59k forks source link

[Bug Report] AsyncVectorEnv gets TypeError #2883

Closed DDDOH closed 2 years ago

DDDOH commented 2 years ago

When initializing a self-defined environment using AsyncVectorEnv,

self.envs = gym.vector.AsyncVectorEnv([lambda: gym.make(
            id=args.env_name, traj_len=self.args.max_episode_steps) for _ in range(self.args.num_processes)])

gets the following error:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/site-packages/gym/vector/async_vector_env.py", line 164, in __init__
    process.start()
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/site-packages/gym/vector/utils/misc.py", line 19, in __getstate__
    return cloudpickle.dumps(self.fn)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/shuffleofficial/opt/anaconda3/envs/sacbad/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object

System Info Describe the characteristic of your environment:

I'll upload the environment definition code if necessary.

arjun-kg commented 2 years ago

It's likely a problem in the environment definition. When you use AsyncVectorEnv the environment gets serialized with cloudpickle and gets sent to another process and there seems to be a problem during serialization. I'm not sure what exactly could be wrong. Posting the environment definition would be helpful.

DDDOH commented 2 years ago

I partially solved the issue with the help of the comment https://github.com/openai/gym/issues/2883#issuecomment-1153180165, by moving the call of AsyncVectorEnv method to a separate file. Below are the details.

This is my environment definition.

'''
inherit from https://github.com/openai/gym/blob/master/gym/envs/mujoco/ant_v4.py
rewrite step and reset_model to support nonstationary tasks
'''

import numpy as np
from gym.envs.mujoco.ant_v4 import AntEnv
import gym

class AntDir(AntEnv):
    def __init__(self, traj_len, id=None):
        self.init_parent = False  # whether AntEnv has been initialized
        """
        init AntEnv will actually call the gym.envs.mujoco.mujoco_env.py init method the step method is called as below
        '''
        observation, _reward, done, _info = self.step(action)
        assert not done

        self._set_observation_space(observation)
        '''
        we only need assert not done and return observation of appropriate shape

        """
        AntEnv.__init__(self)
        self.init_parent = True  # whether AntEnv has been initialized
        self.id = id  # when multiple environments are created in VectorEnvs, the id is used to distinguish them
        self.traj_len = traj_len
        self.dim_context = 1

    # rewrite step function to recompute the reward function

    def step(self, action):
        if self.init_parent:
            xy_position_before = self.get_body_com("torso")[:2].copy()
            self.do_simulation(action, self.frame_skip)
            xy_position_after = self.get_body_com("torso")[:2].copy()

            agent_velocity = (xy_position_after - xy_position_before) / self.dt

            context = self.traj_context[self.traj_step]

            goal_direction = np.array([np.cos(context), np.sin(context)])
            forward_velocity = np.dot(agent_velocity, goal_direction)
            vertical_velocity = np.sqrt(
                agent_velocity[0]**2 + agent_velocity[1]**2 - forward_velocity**2)
            forward_reward = forward_velocity

            # forward_reward = x_velocity
            healthy_reward = self.healthy_reward

            rewards = forward_reward - 0.5 * vertical_velocity + healthy_reward

            costs = ctrl_cost = self.control_cost(action)

            done = self.done
            observation = self._get_obs()
            info = {
                "reward_forward": forward_reward,
                "reward_ctrl": -ctrl_cost,
                "reward_survive": healthy_reward,
                "x_position": xy_position_after[0],
                "y_position": xy_position_after[1],
                # "distance_from_origin": np.linalg.norm(xy_position_after, ord=2),
                # "x_velocity": x_velocity,
                # "y_velocity": y_velocity,
            }
            if self._use_contact_forces:
                contact_cost = self.contact_cost
                costs += contact_cost
                info["reward_ctrl"] = -contact_cost

            reward = rewards - costs

            self.traj_step += 1

            if self.traj_step == self.traj_len:
                done = True

            # 6-11 the new render API is not supported yet
            # self.renderer.render_step()
            return observation, reward, done, info
        else:
            observation = self._get_obs()
            done = False

            return observation, 0, done, {}

    # rewrite reset_model function to set the traj_context

    def reset_model(self):
        noise_low = -self._reset_noise_scale
        noise_high = self._reset_noise_scale

        qpos = self.init_qpos + self.np_random.uniform(
            low=noise_low, high=noise_high, size=self.model.nq
        )
        qvel = (
            self.init_qvel
            + self._reset_noise_scale *
            self.np_random.standard_normal(self.model.nv)
        )
        self.set_state(qpos, qvel)

        observation = self._get_obs()

        return observation

    def reset(self, options, seed=None):
        # print(options)
        observation = super(AntEnv, self).reset(seed=seed)
        traj_context = options['traj_context'][self.id]
        assert self.traj_len == len(
            traj_context), "traj_context length must be equal to max_episode_steps"
        self.traj_context = traj_context
        self.traj_step = 0
        return observation

    @staticmethod
    def get_context_distribution():
        # return a function that return a context
        goal_direction_base = np.random.uniform(0, np.pi * 2)

        def get_context(length):
            get_context.message = "np.random.normal({}, 0.1, length)".format(
                goal_direction_base)
            return np.random.normal(goal_direction_base, 0.1, length)

        return get_context

Initially, I used AsyncVectorEnv in a rather long piece of code, which does A LOT of staff (and may contain some unpickable objects) then got the TypeError: cannot pickle '_thread.lock' object error. I conjecture that AsyncVectorEnv not only pickle objects defined inside the environment class, but also the objects in the file where you use AsyncVectorEnv.

By moving

self.envs = gym.vector.AsyncVectorEnv([lambda: gym.make(
            id=args.env_name, traj_len=self.args.max_episode_steps) for _ in range(self.args.num_processes)])

into a separate file, like the following one:

# in file get_env.py
import gym
def get_envs(env_name, max_episode_steps, num_processes):
    return gym.vector.AsyncVectorEnv([lambda: gym.make(
            id=env_name, traj_len=max_episode_steps) for _ in range(num_processes)])

Then at where you want to call AsyncVectorEnv, use get_env.get_envs(env_name, max_episode_steps, num_processes) instead.

I'm not exactly sure why this method can erase the error, but if my method is the correct one, I suggest adding some notes to the official document telling users to call AsyncVectorEnv in a separate and simple file.

arjun-kg commented 2 years ago

As you mention, this environment works fine on its own. Could you provide more context for the long piece of code you mention? Source code would be great but if not, some information that would help us reproduce this error.

DDDOH commented 2 years ago

I'll provide a minimal set of code that can reproduce this error later.

jkterry1 commented 2 years ago

Hey, I'm going to close this due to inactivity, please let us know if you need additional hlep