DLR-RM / stable-baselines3

PyTorch version of Stable Baselines, reliable implementations of reinforcement learning algorithms.
https://stable-baselines3.readthedocs.io
MIT License
8.92k stars 1.68k forks source link

Pybullet SubprocVecEnv Multiprocessing leads to Broken Pipe Error #509

Closed swagatk closed 3 years ago

swagatk commented 3 years ago

Dear All,

I am new to SB3. I have been able to run few basic codes. Recently, I trained KukaDiverseObjectEnv successfully with SAC algorithm. Now I want to run multiple environments using SubProcVecEnv. It did not work for me. I want to share my experience here and hopefully, someone will be able to help me in fixing this bug. I am running all these codes on Google Colab.

I am using a Gym Wrapper to convert the observation images from channel-last format to channel-first format and normalize the pixels between 0 and 1. My wrapper is as follows:

import gym
import numpy as np

class NormalizeObsvnWrapper(gym.Wrapper):
  """
  :param env: (gym.Env)   Gym environment that will be wrapped
  """
  def __init__(self, env):
    assert isinstance(env.observation_space, gym.spaces.Box),\
     "Valid for continuous observation spaces of type gym.spaces.Box"

    self._height = env.observation_space.shape[0]
    self._width = env.observation_space.shape[1]
    self._channels = env.observation_space.shape[2]

    env.observation_space = gym.spaces.Box(low=0, high=255,
                                            shape=(self._channels, 
                                                   self._height,
                                                   self._width))
    env.reward_range = (-np.inf, np.inf)
    # call the parent constructor so that we can access self.env
    super(NormalizeObsvnWrapper, self).__init__(env)

  def _modify_obsvn(self, obs):
    new_obs = np.transpose(obs, (2, 0, 1))
    new_obs = np.asarray(new_obs, dtype=np.float32) / 255.0
    return new_obs

  def reset(self):
    """
    Convert Images from HxWxC format to CxHxW
    Normalize the pixels between 0 and 1.0
    """
    return self._modify_obsvn(self.env.reset())

  def step(self, action):
    obs, reward, done, info = self.env.step(action)
    new_obs = self._modify_obsvn(obs)
    info['channel_first'] = True
    info['nomalize pixel'] = True
    return new_obs, reward, done, info

I also use a customCNN network to extract features from the input images.

import gym
import torch as th

from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class CustomCNN(BaseFeaturesExtractor):
  """
  :param observation_space: (gym.space)
  :param features_dim: (int) number of features extracted. This corresponds to 
        the number of unit for the last layer
  """
  def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 256):
    super(CustomCNN, self).__init__(observation_space, features_dim)
    # We assume CxHxW images (channels first format)
    n_input_channels = observation_space.shape[0]
    self.cnn = th.nn.Sequential(
        th.nn.Conv2d(n_input_channels, 16, kernel_size=2, stride=2, padding=0),
        th.nn.ReLU(),
        th.nn.BatchNorm2d(16),
        th.nn.Conv2d(16, 32, kernel_size=2, stride=2, padding=0),
        th.nn.ReLU(),
        th.nn.BatchNorm2d(32),
        th.nn.Conv2d(32, 32, kernel_size=2, stride=2, padding=0),
        th.nn.ReLU(),
        th.nn.BatchNorm2d(32),
        th.nn.Flatten()
    )

    # compute shape by doing one forward pass
    with th.no_grad():
      n_flatten = self.cnn(
          th.as_tensor(observation_space.sample()[None]).float()
      ).shape[1]

    self.linear = th.nn.Sequential(
        th.nn.Linear(n_flatten, 128), 
        th.nn.ReLU(),
        th.nn.Linear(128, 128),
        th.nn.ReLU(),
        th.nn.Linear(128, features_dim),
        th.nn.ReLU()
        ) 

  def forward(self, observations: th.Tensor) -> th.Tensor:
    return self.linear(self.cnn(observations))

Now I can train a single environment as follows:


p.connect(p.DIRECT)

# create the environment
env = NormalizeObsvnWrapper(KukaDiverseObjectEnv(maxSteps=20, isDiscrete=False, renders=False,
                          removeHeightHack=False))
env = Monitor(env, monitor_path)

policy_kwargs = dict(
    features_extractor_class = CustomCNN,
    features_extractor_kwargs = dict(features_dim=64),
    net_arch = dict(qf=[128, 64, 32], pi=[128, 64, 64])
)
# create RL model
model = SAC('CnnPolicy', env, buffer_size=100000, batch_size=256,
            policy_kwargs=policy_kwargs, tensorboard_log=tb_log_path)

# train the model: 50K time steps is adequate
%time model.learn(total_timesteps=50000, log_interval=4, tb_log_name='kuka_sac')

p.disconnect(p.DIRECT)

This is how the training looks like on tensorboard: image

Now I want to run 2 environments simultaneously using SubprocVecEnv. First I define the make_env function as follows:

import gym
from typing import Callable
from stable_baselines3.common.utils import set_random_seed

def make_env(env_id, rank: int, seed: int = 0) -> Callable:

  """
  Utility function for multiprocessed env
  :param env: (gym.env) gym environment instance
  """

  def _init() -> gym.Env:
    if isinstance(env_id, gym.Env):
      env = env_id
    elif isinstance(str):
      env = gym.make(env_id)
    else:
      raise ValueError('Invalid environment id')
    env.seed(seed + rank)
    return env
  set_random_seed(seed)
  return _init

My main training program is as follows:

import pybullet as p
import gym
import numpy as np
from datetime import datetime
from pybullet_envs.bullet.kuka_diverse_object_gym_env import KukaDiverseObjectEnv
from stable_baselines3 import SAC
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.monitor import Monitor

p.connect(p.DIRECT)
# Create an instance of the Gym Environment
env_id = NormalizeObsvnWrapper(KukaDiverseObjectEnv(maxSteps=20, isDiscrete=False, renders=False,
                          removeHeightHack=False))

# I also tried using without the wrapper without any success
# env_id = KukaDiverseObjectEnv(maxSteps=20, isDiscrete=False, renders=False,
#                           removeHeightHack=False)

# create vectorized environment
num_cpu = 2
env = SubprocVecEnv([make_env(env_id, i) for i in range(num_cpu)])
#env = DummyVecEnv([make_env(env_id, i) for i in range(num_cpu)])  # this does not work either

# Monitor the vectorized environment 
env = Monitor(env, monitor_path)  

# custom policy parameters
policy_kwargs = dict(
    features_extractor_class = CustomCNN,
    features_extractor_kwargs = dict(features_dim=64),
    net_arch = dict(qf=[128, 64, 32], pi=[128, 64, 64])
)

# create RL model
model = SAC('CnnPolicy', env, buffer_size=70000, batch_size=256,
            policy_kwargs=policy_kwargs, tensorboard_log=tb_log_path)

# train the model
begin = datetime.now()
model.learn(total_timesteps=50000, log_interval=4, tb_log_name='kuka_sac_mp'), 
end = datetime.now()
print('Training time: ', end - begin)
p.disconnect(p.DIRECT)

I get the following error on Google Colab:

BrokenPipeError                           Traceback (most recent call last)
<ipython-input-11-1a3dc3ee7cba> in <module>()
     25 # create vectorized environment
     26 num_cpu = 2
---> 27 env = SubprocVecEnv([make_env(env_id, i) for i in range(num_cpu)])
     28 #env = DummyVecEnv([make_env(env_id, i) for i in range(num_cpu)])
     29 

5 frames
/usr/lib/python3.7/multiprocessing/popen_forkserver.py in _launch(self, process_obj)
     52         self.finalizer = util.Finalize(self, os.close, (self.sentinel,))
     53         with open(w, 'wb', closefd=True) as f:
---> 54             f.write(buf.getbuffer())
     55         self.pid = forkserver.read_signed(self.sentinel)
     56 

BrokenPipeError: [Errno 32] Broken pipe

I will greatly appreciate any help in this regard.

Thanks, Swagat

Miffyli commented 3 years ago

@araffin I have a faint recollection somebody was working on Kuka environments related to SB3 (or zoo), but I can not seem to find it. Can you comment on this?

swagatk commented 3 years ago

Hi, I did some more investigation. I have been able to fix these errors to some extent. I am sharing it again here for the benefit of readers. I am able to avoid the previous error by passing the kuka environment as a dict variable. This allows me to make use of the make_vec_env function without any errors. Looking into the source code of this function helped.

The code now appears something like this:

env_id = NormalizeObsvnWrapper
env_kwargs = dict(
    env = KukaDiverseObjectEnv(maxSteps=20, isDiscrete=False, renders=False,
                          removeHeightHack=False)
)
vec_env = make_vec_env(env_id, n_envs=4, monitor_dir=monitor_path, env_kwargs=env_kwargs)

policy_kwargs = dict(
    features_extractor_class = CustomCNN,
    features_extractor_kwargs = dict(features_dim=64),
    net_arch = dict(qf=[128, 64, 32], pi=[128, 64, 64])
)

# create a model
model = SAC('CnnPolicy', vec_env, buffer_size=70000, batch_size=256,
            policy_kwargs=policy_kwargs, tensorboard_log=tb_log_path)

# train the model:
model.learn(total_timesteps=50000, log_interval=4, tb_log_name='kuka_sac_mp'),

I get the following error this time:

ValueError                                Traceback (most recent call last)
<ipython-input-8-755f0f448b2d> in <module>()
     46 
     47 model = SAC('CnnPolicy', vec_env, buffer_size=70000, batch_size=256,
---> 48             policy_kwargs=policy_kwargs, tensorboard_log=tb_log_path)
     49 
     50 # train the model: 50K time steps is adequate

2 frames
/usr/local/lib/python3.7/dist-packages/stable_baselines3/common/base_class.py in __init__(self, policy, env, policy_base, learning_rate, policy_kwargs, tensorboard_log, verbose, device, support_multi_env, create_eval_env, monitor_wrapper, seed, use_sde, sde_sample_freq, supported_action_spaces)
    173             if not support_multi_env and self.n_envs > 1:
    174                 raise ValueError(
--> 175                     "Error: the model does not support multiple envs; it requires " "a single vectorized environment."
    176                 )
    177 

ValueError: Error: the model does not support multiple envs; it requires a single vectorized environment.

So, it turns out that the current SAC implementation does not support multiple environments. The code works if I use DummyVecEnv as follows:

env = NormalizeObsvnWrapper(KukaDiverseObjectEnv(maxSteps=20, isDiscrete=False, renders=False,
                          removeHeightHack=False))
vec_env = DummyVecEnv([lambda: env])
policy_kwargs = dict(
    features_extractor_class = CustomCNN,
    features_extractor_kwargs = dict(features_dim=64),
    net_arch = dict(qf=[128, 64, 32], pi=[128, 64, 64])
)
model = SAC('CnnPolicy', vec_env, buffer_size=70000, batch_size=256,
            policy_kwargs=policy_kwargs, tensorboard_log=tb_log_path)
model.learn(total_timesteps=50000, log_interval=4, tb_log_name='kuka_sac_mp'), 

Apparently, PPO implementation supports multi-processing. The following code seems to work.

env_id = NormalizeObsvnWrapper
env_kwargs = dict(
    env = KukaDiverseObjectEnv(maxSteps=20, isDiscrete=False, renders=False,
                          removeHeightHack=False)
)
vec_env = make_vec_env(env_id, n_envs=4, monitor_dir=monitor_path, env_kwargs=env_kwargs)

policy_kwargs = dict(
    features_extractor_class = CustomCNN,
    features_extractor_kwargs = dict(features_dim=64),
    net_arch = [128, dict(vf=[128, 64, 32], pi=[128, 64, 64])]    # check PPO documentation
)

# create a model
model = PPO('CnnPolicy', vec_env, n_steps=2048, batch_size=64,
            policy_kwargs=policy_kwargs, tensorboard_log=tb_log_path)

# train the model
model.learn(total_timesteps=50000, log_interval=4, tb_log_name='kuka_ppo_mp', callback=eval_callback)

So, to conclude the discussion, SAC does not support multi-processing while PPO does. So, there is no problem with the library. I was making mistake in using the interface properly. Thank you for creating this great library. Also, in this post, I demonstrate how to make use of Gym Wrappers and custom policy networks to work with KukaDiverseObject environment.

Regards, Swagat

araffin commented 3 years ago

SAC does not support multi-processing while PPO does.

yes, that's a planned feature (https://github.com/DLR-RM/stable-baselines3/issues/179) and there is an experimental branch for it: https://github.com/DLR-RM/stable-baselines3/pull/439

If the issue is solved, you can close it ;)

swagatk commented 3 years ago

Yes, I think the issue is resolved. Thanks for your help. I am closing the thread.

yijionglin commented 3 years ago

@araffin

Hi,

It seems still not to work with dict obs using multiprocess with SAC. Could I know when it would be implemented?

all the best,

Miffyli commented 3 years ago

@JatGwingLam See above reply by arrafin: SAC multiprocessing is not supported yet, and is planned/worked on. If you really want to try it out, you could pull PR #439 and install SB3 from that and see if it works out.

yijionglin commented 3 years ago

@JatGwingLam See above reply by arrafin: SAC multiprocessing is not supported yet, and is planned/worked on. If you really want to try it out, you could pull PR #439 and install SB3 from that and see if it works out.

Thanks for your reply, I've tried that and found dict obs is not supported.

Miffyli commented 3 years ago

Thanks for your reply, I've tried that and found dict obs is not supported.

You could try merging master branch to the PR branch, however there were drastic changes in dict-obs update so I am going to assume it will not automatically merge, and would require a ton of cleaning up.

@araffin any plans to continue on this some day?

araffin commented 3 years ago

You could try merging master branch to the PR branch, however there were drastic changes in dict-obs update so I am going to assume it will not automatically merge, and would require a ton of cleaning up.

The branch is up to date with master, it is just that the current multi-env implementation was not made (but can be adapted) for dict-obs. I plan to continue working on that in September.

ruoshiliu commented 3 years ago

So, to conclude the discussion, SAC does not support multi-processing while PPO does. So, there is no problem with the library. I was making mistake in using the interface properly. Thank you for creating this great library. Also, in this post, I demonstrate how to make use of Gym Wrappers and custom policy networks to work with KukaDiverseObject environment.

This is so helpful! Thanks a bunch @swagatk !