openai / baselines

OpenAI Baselines: high-quality implementations of reinforcement learning algorithms
MIT License
15.83k stars 4.88k forks source link

SubProcVecEnv raises ConnectionResetError: [Errno 104] Connection reset by peer #1075

Open MakisKans opened 4 years ago

MakisKans commented 4 years ago

I'm trying to run the following code and test PPO with Sonic the hedghehog, running it in parralel with SubProcVecEnv Unfortunately I run in the following error:

Traceback (most recent call last):

  File "<ipython-input-2-9712559f6750>", line 3, in <module>
    env = SubprocVecEnv([foo(game,state) for _ in range(num_cpu)])

  File "/home/chryskan/anaconda3/lib/python3.7/site-packages/stable_baselines/common/vec_env/subproc_vec_env.py", line 96, in __init__
    observation_space, action_space = self.remotes[0].recv()

  File "/home/chryskan/anaconda3/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()

  File "/home/chryskan/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)

  File "/home/chryskan/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)

ConnectionResetError: [Errno 104] Connection reset by peer

This is the code:

from stable_baselines.common.policies import MlpPolicy, CnnPolicy
from stable_baselines.common import make_vec_env
from stable_baselines import PPO2
from my_wrappers import wrap_sonic, make_sonic
from stable_baselines.common.env_checker import check_env
from stable_baselines.common.vec_env import SubprocVecEnv`

def foo(game, state):
        def bar():
            return wrap_sonic(make_sonic(game, state), episode_life=False, 
                  frame_stack=True, scale=True,
                  reward_scale = True, sonic_actions=True, 
                  max_x_reward=True)
        return bar

game , state = "SonicTheHedgehog-Genesis", "GreenHillZone.Act1"
num_cpu = 4
env = SubprocVecEnv([foo(game,state) for _ in range(num_cpu)])

model = PPO2(CnnPolicy, env, verbose=1)
model.learn(total_timesteps=1000_000)

And the my_wrappers module

import numpy as np
from collections import deque
import gym
from gym import spaces
import cv2
from retro_contest.local import make

height, width = 84, 84

class ActionsDiscretizer(gym.ActionWrapper):
    def __init__(self, env):
        super(ActionsDiscretizer, self).__init__(env)
        buttons = ["B", "A", "MODE", "START", "UP", "DOWN", "LEFT", 
                   "RIGHT", "C", "Y", "X", "Z"]
        actions = [['LEFT'], ['RIGHT'], ['LEFT', 'DOWN'], 
                   ['RIGHT', 'DOWN'], ['DOWN'],
                   ['DOWN', 'B'], ['B']]
        self._actions = []

        for action in actions:
            arr = np.array([False] * 12)
            for button in action:
                arr[buttons.index(button)] = True
            self._actions.append(arr)
        self.action_space = gym.spaces.Discrete(len(self._actions))

    def action(self, a):
        return self._actions[a].copy()

class RewardScaler(gym.RewardWrapper):
    """
    Bring rewards to a reasonable scale for PPO.
    This is incredibly important and effects performance
    drastically.
    """
    def reward(self, reward):
        return reward * 0.01

class AllowBacktracking(gym.Wrapper):
    """
    Use deltas in max(X) as the reward, rather than deltas
    in X. This way, agents are not discouraged too heavily
    from exploring backwards if there is no way to advance
    head-on in the level.
    """
    def __init__(self, env):
        super(AllowBacktracking, self).__init__(env)
        self._cur_x = 0
        self._max_x = 0

    def reset(self, **kwargs):
        self._cur_x = 0
        self._max_x = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        obs, rew, done, info = self.env.step(action)
        self._cur_x += rew
        rew = max(0, self._cur_x - self._max_x)
        self._max_x = max(self._max_x, self._cur_x)
        return obs, rew, done, info

class NoopResetEnvSonic(gym.Wrapper):
    def __init__(self, env, noop_max=30):
        """Sample initial states by taking random number of no-ops on reset.
        No-op is assumed to be action 0.
        """
        gym.Wrapper.__init__(self,env)
        self.noop_max = noop_max
        self.override_num_noops = None
        self.noop_action = np.zeros(env.action_space.n)

    def reset(self, **kwargs):
         """ Do no-op action for a number of steps in [1, noop_max]."""
         self.env.reset(**kwargs)
         if self.override_num_noops is not None:
             noops = self.override_num_noops
         else:
            noops = self.unwrapped.np_random.randint(1, self.noop_max+1)
         assert noops > 0
         obs = None
         for _ in range(noops):
             obs, _, done, _ = self.env.step(self.noop_action)
             if done:
                 obs = self.env.reset(**kwargs)
         return obs

class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env, skip=4):
        """Return only every `skip`-th frame"""
        gym.Wrapper.__init__(self, env)
        # most recent raw observations (for max pooling across time steps)
        self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype='uint8')
        self._skip = skip

    def step(self, action):
        """Repeat action, sum reward, and max over last observations."""        
        total_reward = 0.0
        done = None
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            if i==self._skip - 2: self._obs_buffer[0] = obs
            if i==self._skip - 1: self._obs_buffer[1] = obs
            total_reward += reward
            if done:
                break
        # Note that the observation on the done=True frame
        # doesn't matter
        max_frame = self._obs_buffer.max(axis=0)

        return max_frame, total_reward, done, info

class FireResetEnv(gym.Wrapper):
    def __init__(self, env):
        """Take action on reset for environments that are fixed until firing."""
        gym.Wrapper.__init__(self, env)
        assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
        assert len(env.unwrapped.get_action_meanings()) >= 3

    def reset(self, **kwargs):
        self.env.reset(**kwargs)
        obs, _, done, _ = self.env.step(1)
        if done:
            self.env.reset(**kwargs)
        obs, _, done, _ = self.env.step(2)
        if done:
            self.env.reset(**kwargs)
        return obs

class EpisodicLifeEnv(gym.Wrapper):
    def __init__(self, env):
        """Make end-of-life == end-of-episode, but only reset on true game over.
        Done by DeepMind for the DQN and co. since it helps value estimation.
        """
        gym.Wrapper.__init__(self, env)
        self.lives = 0
        self.was_real_done  = True

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.was_real_done = done
        # check current lives, make loss of life terminal,
        # then update lives to handle bonus lives
        lives = self.env.unwrapped.ale.lives()
        if lives < self.lives and lives > 0:
            # for Qbert somtimes we stay in lives == 0 condtion for a few frames
            # so its important to keep lives > 0, so that we only reset once
            # the environment advertises done.
            done = True
        self.lives = lives
        return obs, reward, done, info

    def reset(self, **kwargs):
        """Reset only when lives are exhausted.
        This way all states are still reachable even though lives are episodic,
        and the learner need not know about any of this behind-the-scenes.
        """
        if self.was_real_done:
            obs = self.env.reset(**kwargs)
        else:
            # no-op step to advance from terminal/lost life state
            obs ,_, _, _ = self.env.step(0)    
        self.lives = self.env.unwrapped.ale.lives()
        return obs

class ClipRewardEnv(gym.RewardWrapper):
    def reward(self, reward):
        """Bin reward to {+1, 0, -1} by its sign."""
        return np.sign(reward)

class WarpFrame(gym.ObservationWrapper):
    def __init__(self, env):
        """Warp frames to 84x84 as done in the Nature paper and later work."""
        gym.ObservationWrapper.__init__(self, env)
        self.width = width
        self.height = height
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.height, self.width, 1), dtype=np.uint8)

    def observation(self, frame):
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
        return frame[:, :, None]

class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        """Stack k last frames.
        Returns lazy array, which is much more memory efficient.
        See Also
        --------
        baselines.common.atari_wrappers.LazyFrames
        """
        gym.Wrapper.__init__(self, env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = spaces.Box(low=0, high=255, shape=(shp[0], shp[1], shp[2] * k), dtype=np.uint8)

    def reset(self):
        ob = self.env.reset()
        for _ in range(self.k):
            self.frames.append(ob)
        return self._get_ob()

    def step(self, action):
        ob, reward, done, info = self.env.step(action)
        self.frames.append(ob)
        return self._get_ob(), reward, done, info

    def _get_ob(self):
        assert len(self.frames) == self.k
        return np.reshape(self.frames, newshape=self.observation_space.shape)

class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, observation):
        # careful! This undoes the memory optimization, use
        # with smaller replay buffers only.
        return np.array(observation).astype(np.float32) / 255.0

class LazyFrames(object):
    def __init__(self, frames):
        """This object ensures that common frames between the observations are only stored once.
        It exists purely to optimize memory usage which can be huge for DQN's 1M frames replay
        buffers.
        This object should only be converted to numpy array before being passed to the model.
        You'd not believe how complex the previous solution was."""
        self._frames = frames

    def __array__(self, dtype=None):
        out = np.concatenate(self._frames, axis=2)
        if dtype is not None:
            out = out.astype(dtype)
        return out

def make_sonic(game, state):
    env = make(game=game, state=state)
    #env = retrowrapper.RetroWrapper(game=game, state=state)
    env = NoopResetEnvSonic(env, noop_max=30)
    env = MaxAndSkipEnv(env, skip=4)
    return env

def wrap_sonic(env, episode_life=True, clip_rewards=False, 
                  frame_stack=False, scale=False, pytorch_img=False,
                  reward_scale = True, sonic_actions=True, max_x_reward=True):
    if episode_life:
        env = EpisodicLifeEnv(env)

    key = 'get_action_meanings'
    if hasattr(env.unwrapped, key):
        meanings = env.unwrapped.get_action_meanings()
    else:
        meanings = [env.unwrapped.get_action_meaning(np.zeros(a)) 
                        for a in range(env.action_space.n)]
    if 'FIRE' in meanings:
        env = FireResetEnv(env)
    env = WarpFrame(env)
    if scale:
        env = ScaledFloatFrame(env)
    if max_x_reward:
        env=AllowBacktracking(env)
    if sonic_actions:
        env=ActionsDiscretizer(env)
    if clip_rewards:
        env = ClipRewardEnv(env)
    if reward_scale:
        env = RewardScaler(env)
    if frame_stack:
        env = FrameStack(env, 4)
    return env
fnuabhimanyu commented 4 years ago

I was facing similar issues, but I solved it by removing verbose. Try removing verbose =1 , from model = PPO2(CnnPolicy, env, verbose=1).

HongminWu commented 4 years ago

still, struggling in this issues, does anyone help me out?

cmal commented 4 years ago

Same problem. Does this have an answer now?

bamasa commented 3 years ago

Same

harsh-ux commented 3 years ago

Facing the same problem. Is anyone having a solution to this ?

Open-AGI commented 2 years ago
/root/miniconda3/lib/python3.7/multiprocessing/connection.py:379: ConnectionResetError
----------------------------------------------------------------------------- Captured stderr call ------------------------------------------------------------------------------
Process SpawnProcess-2:
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/root/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/root/virtualenv/venv/baselines/baselines/common/vec_env/subproc_vec_env.py", line 15, in worker
    envs = [env_fn_wrapper() for env_fn_wrapper in env_fn_wrappers.x]
  File "/root/virtualenv/venv/baselines/baselines/common/vec_env/subproc_vec_env.py", line 15, in <listcomp>
    envs = [env_fn_wrapper() for env_fn_wrapper in env_fn_wrappers.x]
  File "/root/virtualenv/venv/baselines/baselines/common/tests/test_env_after_learn.py", line 15, in make_env
    env = gym.make('CartPole-v1' if algo == 'acktr' else 'PongNoFrameskip-v4')
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/registration.py", line 156, in make
    return registry.make(id, **kwargs)
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/registration.py", line 101, in make
    env = spec.make(**kwargs)
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/registration.py", line 73, in make
    env = cls(**_kwargs)
  File "/root/miniconda3/lib/python3.7/site-packages/gym/envs/atari/atari_env.py", line 49, in __init__
    self.game_path = atari_py.get_game_path(game)
  File "/root/miniconda3/lib/python3.7/site-packages/atari_py/games.py", line 20, in get_game_path
    raise Exception('ROM is missing for %s, see https://github.com/openai/atari-py#roms for instructions' % (game_name,))
Exception: ROM is missing for pong, see https://github.com/openai/atari-py#roms for instructions
__________________________________________________________________________ test_env_after_learn[deepq] __________________________________________________________________________

algo = 'deepq'

    @pytest.mark.parametrize('algo', algos)
    def test_env_after_learn(algo):
        def make_env():
            # acktr requires too much RAM, fails on travis
            env = gym.make('CartPole-v1' if algo == 'acktr' else 'PongNoFrameskip-v4')
            return env

        make_session(make_default=True, graph=tf.Graph())
>       env = SubprocVecEnv([make_env])

baselines/common/tests/test_env_after_learn.py:19: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
baselines/common/vec_env/subproc_vec_env.py:71: in __init__
    observation_space, action_space, self.spec = self.remotes[0].recv().x
/root/miniconda3/lib/python3.7/multiprocessing/connection.py:250: in recv
    buf = self._recv_bytes()
/root/miniconda3/lib/python3.7/multiprocessing/connection.py:407: in _recv_bytes
    buf = self._recv(4)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <multiprocessing.connection.Connection object at 0x7f4be43769b0>, size = 4, read = <built-in function read>

    def _recv(self, size, read=_read):
        buf = io.BytesIO()
        handle = self._handle
        remaining = size
        while remaining > 0:
>           chunk = read(handle, remaining)
E           ConnectionResetError: [Errno 104] Connection reset by peer

/root/miniconda3/lib/python3.7/multiprocessing/connection.py:379: ConnectionResetError
----------------------------------------------------------------------------- Captured stderr call 

I noticed this error:

 raise Exception('ROM is missing for %s, see https://github.com/openai/atari-py#roms for instructions' % (game_name,))

When I solved this error, ConnectionResetError: [Errno 104] Connection reset by peer disappeared.But there is one error left

===================================================== 46 failed, 49 passed, 31 skipped, 1111 warnings in 110.05s (0:01:50) ======================================================

The error becomes:

====================================================== 1 failed, 94 passed, 31 skipped, 6368 warnings in 269.86s (0:04:29) ======================================================
baselines/common/tests/test_doc_examples.py:20: in <lambda>
    venv = DummyVecEnv([lambda: cmd_util.make_mujoco_env('Reacher-v2', seed=0)])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

env_id = 'Reacher-v2', seed = 0, reward_scale = 1.0

    def make_mujoco_env(env_id, seed, reward_scale=1.0):
        """
        Create a wrapped, monitored gym.Env for MuJoCo.
        """
>       rank = MPI.COMM_WORLD.Get_rank()
E       AttributeError: 'NoneType' object has no attribute 'COMM_WORLD'

baselines/common/cmd_util.py:112: AttributeError

My environment(for your information):

┌──(root💀agi)-[~]
└─# uname -a                   
Linux agi 5.14.0-kali4-amd64 #1 SMP Debian 5.14.16-1kali1 (2021-11-05) x86_64 GNU/Linux

┌──(venv)─(root💀agi)-[~/virtualenv/venv/baselines]
└─# pip list                                                                                                                                                               130 ⨯
Package                Version    Location                       
---------------------- ---------- -------------------------------
absl-py                1.0.0      
asn1crypto             0.24.0     
astor                  0.8.1      
atari-py               0.2.9      
attrs                  21.2.0     
baselines              0.1.6      /root/virtualenv/venv/baselines
box2d-py               2.3.8      
cached-property        1.5.2      
certifi                2021.10.8  
cffi                   1.12.2     
chardet                3.0.4      
click                  8.0.3      
cloudpickle            1.2.2      
conda                  4.10.3     
conda-package-handling 1.7.3      
cryptography           2.6.1      
cycler                 0.11.0     
Cython                 0.29.26    
filelock               3.4.0      
fonttools              4.28.5     
future                 0.18.2     
gast                   0.5.3      
glfw                   2.5.0      
google-pasta           0.2.0      
grpcio                 1.43.0     
gym                    0.15.7     
h5py                   3.6.0      
idna                   2.8        
imageio                2.13.4     
importlib-metadata     4.10.0     
iniconfig              1.1.1      
joblib                 1.1.0      
Keras-Applications     1.0.8      
Keras-Preprocessing    1.1.2      
kiwisolver             1.3.2      
lockfile               0.12.2     
Markdown               3.3.6      
matplotlib             3.5.1      
mujoco-py              1.50.1.68  
numpy                  1.21.5     
opencv-python          4.5.4.60   
packaging              21.3       
pandas                 1.3.5      
Pillow                 8.4.0      
pip                    19.0.3     
pluggy                 1.0.0      
protobuf               3.19.1     
py                     1.11.0     
pycosat                0.6.3      
pycparser              2.19       
pyglet                 1.5.0      
pyOpenSSL              19.0.0     
pyparsing              3.0.6      
PySocks                1.6.8      
pytest                 6.2.5      
python-dateutil        2.8.2      
pytz                   2021.3     
requests               2.21.0     
ruamel-yaml            0.15.46    
scipy                  1.7.3      
setuptools             41.0.0     
six                    1.12.0     
tensorboard            1.14.0     
tensorflow             1.14.0     
tensorflow-estimator   1.14.0     
termcolor              1.1.0      
toml                   0.10.2     
tqdm                   4.62.3     
typing-extensions      4.0.1      
urllib3                1.24.1     
Werkzeug               2.0.2      
wheel                  0.33.1     
wrapt                  1.13.3     
youtube-dl             2021.12.17 
zipp                   3.6.0      
Mickeyyyang commented 2 years ago

same error...

Vaillus commented 1 year ago

same error

kvrban commented 1 year ago

Same issue. When i try to use SubProcVecEnv on colab

ZhaoRunyi commented 5 months ago

same issue and i know it's related to my customed env bc change to another env the error won't occur

PostResPerditas commented 1 month ago

for the " 'NoneType' object has no attribute 'COMM_WORLD' " error, solve it using 'pip install mpi4py==4.0.1'. Just a lack of dependency module error.