hill-a / stable-baselines

A fork of OpenAI Baselines, implementations of reinforcement learning algorithms
http://stable-baselines.readthedocs.io/
MIT License
4.16k stars 725 forks source link

Sharing data between subprocesses [question] #1048

Closed rporotti closed 3 years ago

rporotti commented 3 years ago

Hi,

cfr. #530, I would like to be able to share data between custom environments spawned with SubProcVecEnv. The solution suggested from @Miffyli here was to pass a multiprocessing.Queue() object to each environment. I tried to do that, but this clashes (apparently) with an already open multiprocessing instance from SB. Is there a way to successfully share data between the envs, without relying on external callbacks? Here you can find a MWE of what I would like to achieve:

import gym
import stable_baselines
import multiprocessing

class BaseEnv(gym.Env):
    def __init__(self, counter, q):
        super(BaseEnv, self).__init__()
        self.counter = counter
        self.queue = q

        if self.counter == 0:
            self.queue.put("Hello")

def make_env(counter, q):
    def _init():
        env = BaseEnv(counter, q)
        return env
    return _init

if __name__ == '__main__':
    n_envs = 4
    q = multiprocessing.Queue()
    env = stable_baselines.common.vec_env.SubprocVecEnv([make_env(i, q) for i in range(n_envs)])

but I get: OSError: [Errno 9] Bad file descriptor I know that this is strictly not a SB problem, but I was wondering if there were a possible fix to this. Thank for your help!

araffin commented 3 years ago

Hello, After some quick trials, it seems that you need to specify start_method="fork". I could get the following example to work (on linux): (I also tested it with Stable-Baselines3 to ensure it was not a tensorflow issue)


import multiprocessing

import numpy as np
import gym

from stable_baselines.common.vec_env import SubprocVecEnv

class CustomEnv(gym.Env):

  def __init__(self, counter, q):
    super(CustomEnv, self).__init__()
    self.observation_space = gym.spaces.Discrete(4)
    self.action_space = gym.spaces.Discrete(4)
    self.counter = counter
    self.queue = q

    if self.counter == 0:
        self.queue.put("Hello")
    else:
        print(self.queue.get())

  def reset(self):
    return self.observation_space.sample()

  def step(self, action):
    obs = self.observation_space.sample()
    reward = 1.0
    done = False
    info = {}
    return obs, reward, done, info

def make_env(counter, q):
    def _init():
        env = CustomEnv(counter, q)
        return env
    return _init

if __name__ == '__main__':
    n_envs = 2
    q = multiprocessing.Queue()
    env = SubprocVecEnv([make_env(i, q) for i in range(n_envs)], start_method="fork")
    obs = env.reset()
rporotti commented 3 years ago

Thanks, it works perfectly (on OSX too)! It was indeed a very quick fix, sorry for asking such a trivial question...