Farama-Foundation / SuperSuit

A collection of wrappers for Gymnasium and PettingZoo environments (being merged into gymnasium.wrappers and pettingzoo.wrappers
Other
453 stars 57 forks source link

Using gym_vec_env with the Parallel API #43

Closed dkkim93 closed 3 years ago

dkkim93 commented 3 years ago

First of all, I appreciate the PettingZoo team for providing this helpful tool! :+1: I would like to use the PettingZoo for the multi-agent Atari game, such as the multi-agent pong game. Specifically, I would like to train multiple agents using the parallel API (parallel_env) and collect multiple trajectories using the vectorized gym env API (gym_vec_env). By following the documentation, I have coded the following:

from pettingzoo.atari import pong_v1
from supersuit import gym_vec_env

env = pong_v1.parallel_env(
    num_players=2,
    obs_type="grayscale_image",
    auto_rom_install_path="/home/dongki/.local/lib/python3.8/site-packages/multi_agent_ale_py")
env = gym_vec_env(env, num_envs=2, multiprocessing=True)

However, this throws the following error:

Traceback (most recent call last):
  File "main.py", line 179, in <module>
    main(args=args)
  File "main.py", line 47, in main
    env = gym_vec_env(env, num_envs=2, multiprocessing=True)
  File "/home/dongki/research/lids/git/meta-mapg/venv/lib/python3.8/site-packages/supersuit/vector_constructors.py", line 13, in gym_vec_env
    args = vec_env_args(env, num_envs)
  File "/home/dongki/research/lids/git/meta-mapg/venv/lib/python3.8/site-packages/supersuit/vector_constructors.py", line 9, in vec_env_args
    return [env_fn] * num_envs, env.observation_space, env.action_space
AttributeError: 'to_parallel' object has no attribute 'observation_space'

As I looked into the issue, this would be due to the fact that the parallel env has the observation_spaces attribute, not the observation_space attribute (and the same for the action space). Would be the current version support the vectorized gym environment with the parallel API? I appreciate your time and consideration!

benblack769 commented 3 years ago

First, our API and the gym API are different. Gym's VectorEnv does not, and will never support PettingZoo environments.

Vector environments for multiplayer environments are harder to design and use than those for single agents. I have a prototype I have been working on here but it isn't production ready, and make certain assumptions about the environment.

For now, I would say that vector environments for pettingzoo are very much future work. So I'll take this as a feature request.

What is the particular use case you hope to use vector environments for? What RL system are you planning to use? What do you think a multiplayer vector environment should look like?

dkkim93 commented 3 years ago

Thank you for your reply and I apologize for the lack of clarity. To clarify, I would like to understand better about the SuperSuit's vectorized env function and the usage of the function with PettingZoo's parallel env API. My current understanding of the vectorized env function is that it provides a multi-threading capability to process multiple envs simultaneously.

My use case is for meta-learning in multiagent settings, where each agent updates its policy (i.e., inner-loop update) based on a policy gradient algorithm (e.g., REINFORCE). Specifically, agents first interact together in the shared environment (e.g., 2-agent pong) and collect K trajectories. Then, each agent updates its policy using the K trajectories.

Without the vectorized environment, I can collect K trajectories by having a for loop that iterates from 1 to K. However, to reduce the training time, I am hoping to use the vectorized function and collect K trajectories simultaneously based on multi-threading. Therefore, I wonder whether I can use the vectorized function to (assuming 2 agnents for clarity):

# Set parallel env with vectorized function
parallel_env = pong_v1.parallel_env(
    num_players=2,
    obs_type="grayscale_image",
    auto_rom_install_path="/home/dongki/.local/lib/python3.8/site-packages/multi_agent_ale_py")
parallel_env = gym_vec_env(env, num_envs=K, multiprocessing=True)

# Collect K trajectories
observations = parallel_env.reset()  # Returns {"first_0": shape(K, observation dim), "second_0": shape(K, observation dim)}
while True:
    actions = {agent: policy(observations[agent], agent) for agent in parallel_env.agents}
    observations, rewards, dones, infos = parallel_env.step(actions)
    # observations: {"first_0": shape(K, observation dim), "second_0": shape(K, observation dim)}
    # rewards: {"first_0": shape(K, 1), "second_0": shape(K, 1)}
    # dones: {"first_0": shape(K, 1), "second_0": shape(K, 1)}

Previously, with a multi-agent MuJoCo domain, I have used the subprocvec environment to achieve the above. And I wonder whether the SuperSuit's vectorized env function provides similar functionality.

Thank you for your time and help!


After reading the documentation further, for my purpose, should I be using the stable_baselines_vec_env instead of the gym_vec_env?

benblack769 commented 3 years ago

Sorry about the late reply.

  1. No, supersuits vec_env functions only support gym environments. It would be great if there could be something similar to support support pettingzoo environments some day though.
  2. Thanks for the example code. That really helps inform how this feature should be designed.
  3. stable_baselines_vec_env and gym_vec_env should be functionally equivalent for most use cases. The only reason we have 2 of them is that stable_baselines_vec_env has better interoperability with the stable baselines library

As for when this feature can be available, probably not for awhile (at least several weeks). Vector environments are tricky to implement and time consuming to test, due to the use of process shared memory etc.

dkkim93 commented 3 years ago

Thank you again for your reply @weepingwillowben. I have worked on the above functionality based on the subprocvec environment and the code is almost ready. After a verification, I will put an example code in this thread or create a pull request. Thanks!

uhlajs commented 3 years ago

@dkkim93 Can you share the code, please?

dkkim93 commented 3 years ago

@uhlajs Please refer to the below example code. In summary, I have used the subprocvec environment, assuming two agents with names (first_0 and second_0). Please refer to my comments in multiprocessing_env.py for more details.


main.py

import argparse
import numpy as np
from pathlib import Path
from pettingzoo.atari import boxing_v1
from multiprocessing_env import SubprocVecEnv
from supersuit import resize_v0, frame_skip_v0, frame_stack_v1

def make_env(args):
    path = str(Path.home())

    def _make_env():
        env = boxing_v1.parallel_env(
            obs_type="grayscale_image",
            full_action_space=False,
            auto_rom_install_path=path + "/.local/lib/python3.6/site-packages/multi_agent_ale_py")
        env = resize_v0(env, x_size=84, y_size=84)
        env = frame_stack_v1(env, 4)
        env = frame_skip_v0(env, 4)
        return env

    env = SubprocVecEnv([_make_env for _ in range(args.traj_batch_size)])

    return env

def main(args):
    # Make parallel_env
    env = make_env(args)

    # Collect K trajectories (K=10 in this example)
    obs = env.reset()  # obs shape: (10, 84, 84, 4)

    for timestep in range(args.ep_horizon):
        # Get actions
        actions = []
        for i_agent in range(args.n_agent):
            action = np.random.randint(low=0, high=env.action_space[i_agent].n, size=(args.traj_batch_size,))
            actions.append(action)

        # Take step in the environment
        # next_obs shape: (10, 84, 84, 4)
        # rewards shape: (10, 2)
        # dones shape: (10, 2)
        next_obs, rewards, dones, _ = env.step(actions)

        # For next timestep
        obs = next_obs

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="example")

    # Algorithm
    parser.add_argument(
        "--traj-batch-size", type=int, default=10,
        help="Number of trajectories for each update")

    # Env
    parser.add_argument(
        "--ep-horizon", type=int, default=500,
        help="Episode is terminated when max timestep is reached")
    parser.add_argument(
        "--n-agent", type=int, default=2,
        help="Number of agents in a shared environment")

    args = parser.parse_args()
    main(args=args)

multiprocessing_env.py

import numpy as np
from multiprocessing import Process, Pipe

def worker(remote, parent_remote, env_fn_wrapper):
    """Worker class

    References:
        https://github.com/openai/baselines/tree/master/baselines/common/vec_env
    """
    parent_remote.close()
    env = env_fn_wrapper.x()
    while True:
        cmd, data = remote.recv()
        if cmd == 'step':
            data = {"first_0": data[0], "second_0": data[1]}  # NOTE Assuming two agents: "first_0" and "second_0"
            ob, reward, dones, info = env.step(data)
            ob = list(ob.values())[0]  # NOTE Assuming same observations between agents
            reward = list(reward.values())
            dones = list(dones.values())
            info = list(info.values())
            remote.send((ob, reward, dones, info))
        elif cmd == 'reset':
            ob = env.reset()
            ob = list(ob.values())[0]  # NOTE Assuming same observations between agents
            remote.send(ob)
        elif cmd == 'close':
            remote.close()
            break
        elif cmd == 'get_spaces':
            observation_spaces = list(env.observation_spaces.values())
            action_spaces = list(env.action_spaces.values())
            remote.send((observation_spaces, action_spaces))
        elif cmd == 'seed':
            env.seed(data)
        elif cmd == 'render':
            env.render()
        else:
            raise NotImplementedError

class VecEnv(object):
    """An abstract asynchronous, vectorized environment

    References:
        https://github.com/openai/baselines/tree/master/baselines/common/vec_env
    """
    def __init__(self, num_envs, observation_space, action_space):
        self.num_envs = num_envs
        self.observation_space = observation_space
        self.action_space = action_space

    def reset(self):
        """
        Reset all the environments and return an array of
        observations, or a tuple of observation arrays.
        If step_async is still doing work, that work will
        be cancelled and step_wait() should not be called
        until step_async() is invoked again.
        """
        pass

    def step_async(self, actions):
        """
        Tell all the environments to start taking a step
        with the given actions.
        Call step_wait() to get the results of the step.
        You should not call this if a step_async run is
        already pending.
        """
        pass

    def step_wait(self):
        """
        Wait for the step taken with step_async().
        Returns (obs, rews, dones, infos):
         - obs: an array of observations, or a tuple of
                arrays of observations.
         - rews: an array of rewards
         - dones: an array of "episode done" booleans
         - infos: a sequence of info objects
        """
        pass

    def close(self):
        """
        Clean up the environments' resources.
        """
        pass

    def step(self, actions):
        actions = np.stack(actions, axis=1)
        self.step_async(actions)
        return self.step_wait()

class CloudpickleWrapper(object):
    """Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)

    References:
        https://github.com/openai/baselines/tree/master/baselines/common/vec_env
    """
    def __init__(self, x):
        self.x = x

    def __getstate__(self):
        import cloudpickle
        return cloudpickle.dumps(self.x)

    def __setstate__(self, ob):
        import pickle
        self.x = pickle.loads(ob)

class SubprocVecEnv(VecEnv):
    """Vectorized environment class that collects samples in parallel using subprocesses

    Args:
        env_fns (list): list of gym environments to run in subprocesses

    References:
        https://github.com/openai/baselines/tree/master/baselines/common/vec_env
    """
    def __init__(self, env_fns):
        self.env = env_fns[0]()
        self.waiting = False
        self.closed = False
        self.nenvs = len(env_fns)
        self.remotes, self.work_remotes = zip(*[Pipe() for _ in range(self.nenvs)])
        self.ps = [
            Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
            for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
        for p in self.ps:
            p.daemon = True  # If the main process crashes, we should not cause things to hang
            p.start()
        for remote in self.work_remotes:
            remote.close()

        self.remotes[0].send(('get_spaces', None))
        self.observation_space, self.action_space = self.remotes[0].recv()
        VecEnv.__init__(self, len(env_fns), self.observation_space, self.action_space)

    def seed(self, value):
        for i_remote, remote in enumerate(self.remotes):
            remote.send(('seed', value + i_remote))

    def step_async(self, actions):
        for remote, action in zip(self.remotes, actions):
            remote.send(('step', action))
        self.waiting = True

    def step_wait(self):
        results = [remote.recv() for remote in self.remotes]
        self.waiting = False
        obs, rews, dones, infos = zip(*results)
        return np.stack(obs), np.stack(rews), np.stack(dones), infos

    def reset(self):
        for remote in self.remotes:
            remote.send(('reset', None))
        return np.stack([remote.recv() for remote in self.remotes])

    def render(self):
        self.remotes[0].send(('render', None))

    def close(self):
        if self.closed:
            return
        if self.waiting:
            for remote in self.remotes:
                remote.recv()
        for remote in self.remotes:
            remote.send(('close', None))
        for p in self.ps:
            p.join()
            self.closed = True

    def sample_personas(self, is_train, is_val=True, path="./"):
        return self.env.sample_personas(is_train=is_train, is_val=is_val, path=path)
dkkim93 commented 3 years ago

I mistakenly reopened the issue. I apologize for my mistake!

gonultasbu commented 11 months ago

I have extended this method to cover more use cases and dimensions here:

https://github.com/AgileRL/AgileRL/blob/main/agilerl/utils/multiprocessing_env.py