openai / gym

A toolkit for developing and comparing reinforcement learning algorithms.
https://www.gymlibrary.dev
Other
34.79k stars 8.61k forks source link

Thread safety issue when running multiple environments in parallel using joblib #2333

Closed tarcey closed 3 years ago

tarcey commented 3 years ago

I want to run multiple separate environments of the same kind in parallel. Each environment is ever only called by one thread each, and runs independent of all other instances. Per code, each environment should be called correctly and with correct synchronization (automatic as in joblib's 'embarrassingly parallel' use-case), however the reset-function does not register (despite returning an initial observation) when called asynchronously on multiple environments in parallel.

Assuming that each env environment is entirely independent of all others and there are no thread-local variables, i would think that the code below would work. Is my assumption incorrect? What is the correct way to run multiple environments in parallel in gym?

Runnable example code below. With n_jobs=1 no problem occurs. When only the step-function is called in parallel, but the reset function is called iteratively in plain python, no error is thrown. But i'm hesitant to trust the output.

import tensorflow as tf
import gym
from joblib import Parallel, delayed

class BatchEnv:
    def __init__(self, env_id, num):
        self.env_id = env_id
        self.num = num

        self.envs = [gym.make(env_id) for _ in range(num)]

    def reset_envs(self, par):
        def reset(env):
            return env.reset()
        init_observations = par(delayed(reset)(env) for env in self.envs)

        return tf.constant(init_observations, dtype=tf.float32)

    def step_envs(self, actions, par):
        def step(env, action):
            observation, reward, done, _ = env.step(action)
            return [observation, reward, not done]

        measurements = par(delayed(step)(env, action) for env, action in zip(self.envs, actions))

        observations, rewards, alive = zip(*measurements)
        observations = tf.constant(observations, dtype=tf.float32)
        rewards = tf.constant(rewards, dtype=tf.float32)
        alive = tf.constant(alive, dtype=tf.dtypes.bool)

        return observations, rewards, alive

    def record_episodes(self, steps, policynet):
        with Parallel(n_jobs=-1) as par:
            observations = [None] * steps
            rewards = [None] * steps
            actions = [None] * steps
            alive = [None] * steps

            observations_t = self.reset_envs(par)
            print(observations_t.shape)
            # pstates_t = policynet.btu.get_initial_state(batch_size=self.num, dtype=en.dtype)

            for t in range(steps):
                # actions_t, action_sample_t, pstates_t = policynet.act_t(observations_t, pstates_t, training=False)
                actions_t, action_sample_t, pstates_t = None, [env.action_space.sample() for env in self.envs], None

                observations[t] = observations_t
                actions[t] = actions_t
                # action_sample_t = list(tf.squeeze(action_sample_t).numpy().tolist())
                observations_t, rewards[t], alive[t] = self.step_envs(action_sample_t, par)

        return observations, actions, rewards, alive

test = BatchEnv('CartPole-v0', 64)
test = test.record_episodes(60, None)
tristandeleu commented 3 years ago

If you want to run multiple environments in parallel, you could give AsyncVectorEnv a try. This runs each environment in a separate process though, not in a separate thread. Here is an example of how to use it:

import gym

envs = gym.vector.AsyncVectorEnv([lambda: gym.make('CartPole-v0')] * 64)
# Or alternatively
# envs = gym.vector.make('CartPole-v0', num_envs=64)

And then it behaves like any Gym environment

observations = envs.reset()  # Shape (64, 4)
for _ in range(10):
    actions = envs.action_space.sample()  # Or your policy
    observations, rewards, dones, infos = envs.step(actions)

Here is some documentation about the VectorEnv API if you want to have a look at that. There is no out of the box equivalent for vectorized environments using threads in Gym unfortunately, but there is this PR https://github.com/openai/baselines/pull/958 that was open for baselines that introduced a similar vectorized environment with threads.

tarcey commented 3 years ago

Thank you for the quick and helpful response, i will try that.