ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.94k stars 5.58k forks source link

[RLlib] League based PolicyMap across workers impacting scalability via memory use - Question/[Bug] #21459

Open heinerb opened 2 years ago

heinerb commented 2 years ago

Search before asking

Ray Component

RLlib

What happened + What you expected to happen

Currently exploring heterogenous league-based training with RLLIB and running into scalability issues/implementation issues which are noted bellow:

League Terminology Used In this Post:

  1. (Low Pri - Since valid workaround present) Currently given how the env_runner in sampler is implemented, The resetting/initializing running environments for challenger policy between training iterations/matches (train step/callback for training results) must occur at the end of the first episode of the next training iteration/match. This leaves the next training iteration/match as it were with samples from previous tournament match. Work around in provided example script.

https://github.com/ray-project/ray/blob/596c8e27726075623d51428c549304bc0f141f8d/rllib/evaluation/sampler.py#L963-L969

  1. (High Pri - No good work around currently known): The existing PolicyMap (src) is currently duplicated per rollout work vs using a shared remote worker. As scaled out across multiple workers this creates significant duplication the memory used for policies. The out of the box setting is defaulted to store 100 policies per worker in memory and is configurable via the policy_map_capacity settings which in example is set to 2 to reduce to min policies in memory. However, when scaled in the CartPole-v0 and MountainCarContinuopus-v0 examples across 250 workers this can take 250GB of memory. Question is there a recommended way to accomplish the multiple policies in each worker such that it supports Heterogenous league like runs where challengers are constantly loaded in without blowing through memory.

128 cores, 256 threads, v100, 256GB Memory image

Note: I am willing to provide more involved examples and answer questions as needed to resolve mapping issue.

Question is it possible to have PolicyMap per node vs per worker? It is almost like the policy map should be a shared resource on a node vs being duplicated per worker on the node.

Versions / Dependencies

Ray Version:

Tensor Flow

Python

OS

Reproduction script

from ray import tune
from ray.rllib.env import MultiAgentEnv
from gym.envs.classic_control.continuous_mountain_car import Continuous_MountainCarEnv
from gym.envs.classic_control.mountain_car import MountainCarEnv
from gym.envs.classic_control.cartpole import CartPoleEnv
from gym.spaces.dict import Dict
from ray.rllib.agents.callbacks import DefaultCallbacks
from ray.rllib.policy.policy import PolicySpec
import gym
from pprint import pprint
import ray
from ray.tune.utils.log import Verbosity
from ray.rllib.evaluation.rollout_worker import get_global_worker

champ_env = gym.make("CartPole-v0")
MODE_ENV = {
    "MountainCarContinuous-v0": gym.make("MountainCarContinuous-v0"), # Continuous_MountainCarEnv(),
    "CartPole-v0": gym.make("CartPole-v0") # CartPoleEnv(),
}
STARTING_MODE = 0

MAX_MODES = 100

POLICY_IDX = 0
GYM_IDX = 1
ENV_IDX = 2

def get_string_name(i=None):
    if i == 0:
        return "champion"
    else:
        return f"challenger{i}"

def get_mode(i):
    config = {}
    if i % 2 == 0:
        env_str = "CartPole-v0"
    else:
        env_str = "MountainCarContinuous-v0"
    agents = MODE_ENV[env_str]
    return (PolicySpec(config=config, action_space=agents.action_space, observation_space=agents.observation_space), env_str, get_string_name(i))

MODES = {}
print("setup modes")
for i in range(MAX_MODES + 1):
    MODES[i] = get_mode(i)
    print(f"\ti {i}")
    print(f"\ti%2 {i%2}")
    print(f"\tPolicy name --- {get_string_name(i)}")
    print(f"\t\tPolicy --- {MODES[i][0]}")
    print(f"\t\tGym --- {MODES[i][1]}")

class PolicySwitchingCallBack(DefaultCallbacks):
    """Example Policy Switching Callback setup for exploring memory growth and scaling issues with base gym environments
    when scalling out to hundreds of workers per node on larger systems
    """
    def __init__(self, legacy_callbacks_dict=None):
        super().__init__(legacy_callbacks_dict)
        self.new_mode = STARTING_MODE
        self.update_fn = False
        self.new_policy_name = get_string_name(self.new_mode + 1)
        self.old_policy_name = get_string_name(self.new_mode + 1)

        def policy_mapping_fn(agent_id, episode, worker, **kwargs):
            return str(agent_id)

        self.new_policy_map = policy_mapping_fn
        self.policy_names = set()
        self.policy_names.add(get_string_name(0))
        self.policy_names.add(get_string_name(1))

    def on_train_result(self, *, trainer, result, **kwargs):
        """The following is similar concept to trainer step allowing setup to change as it were the challenger
        for a given tournament (lacking other logic for challenger selection and parts for reduced down sample)

        At every step of trainer, epoch, or training iteration ethe trainer will increment through a list of
        policies up to MAX_MODES and swap in new polices. The policy will work through the Trainer, workerset,
        and Rollout workers. Each Rollout worker will then save a version of the new policy in it
        local PolicyMap which in example is configurable ({"multiagent":{"policy_map_capacity": 2 ...}}).

        Note: Reduced down example but each policy added could for example represent a new agent in a full
        heterogenous league play configuration.

        Given that the PolicyMap will cache out policies to disk when capacity was acheived (oldest access going
        to disk) removing of policies was not implemented.
           - Removing of policies actually has implication on currently running episodes and the stash to disk
             call of the policy map.
           - Additionally this does not really solve the in memory duplication across workers
        """
        print(f"Current training iteration --- {trainer.iteration}")
        current_iteration = trainer.iteration

        if current_iteration % 1 != 0:
            return

        self.new_mode = (self.new_mode + 1) % MAX_MODES
        self.old_policy_name = self.new_policy_name
        self.new_policy_name = get_string_name(self.new_mode + 1)

        # Per training/match policy mapping -- every training iteration we are swapping out challenger
        # non trainable policy. Though in actuall setup this is at every episode to represent match concept
        def policy_mapping_fn(agent_id, episode, worker, **kwargs):
            if agent_id == "0":
                return "champion"
            else:
                return self.new_policy_name

        # if trainer.get_policy(self.old_policy_name):
        #     trainer.remove_policy(policy_id=self.old_policy_name)

        if trainer.get_policy(self.new_policy_name) is None:
            mode = get_mode(self.new_mode + 1)
            trainer.add_policy(
                policy_id=self.new_policy_name,
                policy_cls=type(trainer.get_policy(get_string_name(0))),
                policy_mapping_fn=policy_mapping_fn,
                policies_to_train=[get_string_name(0)],
                observation_space=mode[POLICY_IDX].observation_space,
                action_space=mode[POLICY_IDX].action_space
            )

        self.policy_names.add(self.new_policy_name)
        print(f"policy Length = {len(self.policy_names)}")

        # Do to the nature of the env_runner in the rollout worker we have to update the environments at
        # episode end of the first trajectory in the next training iteration --- limitation with how
        # env_runner is configured though minor item
        def update_worker_callback_flag(worker):
            worker.callbacks.new_mode = (worker.callbacks.new_mode) % MAX_MODES
            worker.callbacks.old_policy_name = worker.callbacks.new_policy_name
            worker.callbacks.new_policy_name = get_string_name(worker.callbacks.new_mode + 1)
            worker.callbacks.update_fn = True

        trainer.workers.foreach_worker(update_worker_callback_flag)

    def on_episode_end(self, *, worker, **kwargs) -> None:
        """The following call back updates the environments so that they have the correct observations

        Note: this has to happen in on_episode_end because already ongoing episodes will not change their
        mapping but will use the old mapping till the end of the episode. This will be called a the end of
        a episode before reset is called allowing next episode sample collector to have proper obs setup.

        Note: simplified example does not show reseting of policy on each episode in a training iteration/
              match
        """
        # print(f"\tUpdate Env Settings ---- {self.update_fn}")
        if self.update_fn:
            # inplace reset of the worker context which is shared across various parts and the environment
            # which is accessed by workers sample collector and related parts
            def reset_env(env, ctx):
                ctx["mode"] = self.new_mode
                env.reinit({"mode": self.new_mode})

            # Per worker policy map fn updates - note in reduced form of problem no extra code was added
            # to handle oddities with global worker for local mode
            def policy_mapping_fn(agent_id, episode, worker, **kwargs):
                if agent_id == "0":
                    return "champion"
                else:
                    return worker.callbacks.new_policy_name

            worker.set_policy_mapping_fn(policy_mapping_fn)
            worker.set_policies_to_train([get_string_name(0)])
            worker.foreach_env_with_context(reset_env)

            self.update_fn = False

class ExampleSwitchingEnv(MultiAgentEnv):

    def __init__(self, config=None):
        # print("Initialize the Environment with agent configuration ---")
        self.reinit(config)
        self.config = config
        self.ep_step = 0

    @property
    def observation_space(self):
        return Dict({i: agent.observation_space for i, agent in enumerate(self.agents)})

    @property
    def action_space(self):
        return Dict({i: agent.action_space for i, agent in enumerate(self.agents)})

    def reset(self):
        # print("Episode Steps")
        self.ep_step = 0
        # print(f"\tReset --- {self.ep_step}")
        self.dones = set()
        obs = {str(i): agent.reset() for i, agent in enumerate(self.agents)}
        # print(f"\t\tObservations --- {obs}")
        # print(f"\t\tObservation Space --- {self.observation_space}")
        # print(f"\t\tAction Space --- {self.action_space}")
        return obs

    def step(self, action_dict):
        self.ep_step = self.ep_step + 1
        obs, rew, done, info = {}, {}, {}, {}
        done["__all__"] = False
        index = 0
        for i, action in action_dict.items():
            obs[str(index)], rew[str(index)], done[str(index)], info[str(index)] = self.agents[index].step(action)
            if done[i]:
                done["__all__"] = True
            index += 1

        # if self.ep_step == 1:
        #     print(f"\tStep --- {self.ep_step}")
        #     print(f"\t\tObservations --- {obs}")
        #     print(f"\t\tRewards ---{rew}")
        #     print(f"\t\tDones ---{done}")
        #     print(f"\t\tInfos ---{info}")

        return obs, rew, done, info

    def reinit(self, config):
        # print("Reinitialize the Environment with new agent configuration")
        self.agents = []
        self.config = config
        mode = config.get("mode", 1)
        # print(f"Mode --> {mode}")
        # print(f"\tChampion --> {get_string_name(0)}")
        # print(f"\t\tPolicy --> {MODES[0][POLICY_IDX]}")
        # print(f"\t\tGym --> {MODES[0][GYM_IDX]}")
        # self.agents.append(MODE_ENV[MODES[0][GYM_IDX]])
        self.agents.append(champ_env)
        # print(f"\tChallenger --> {get_string_name(mode+1)}")
        # print(f"\t\tPolicy --> {MODES[mode+1][POLICY_IDX]}")
        # print(f"\t\tGym --> {MODES[mode+1][GYM_IDX]}")
        self.agents.append(MODE_ENV[MODES[mode + 1][GYM_IDX]])

if __name__ == "__main__":
    # Each policy can have a different configuration (including custom model).
    def gen_policy(i):
        print(f"Policy name --- {get_string_name(i)}")
        print(f"\tPolicy --- {MODES[i][0]}")
        print(f"\tGym --- {MODES[i][1]}")
        return MODES[i][POLICY_IDX]

    # Setup PPO with an ensemble of `num_policies` different policies.

    policies = {get_string_name(i): gen_policy(i) for i in range(2)}

    policy_ids = list(policies.keys())

    pprint(policies)

    def policy_mapping_fn(agent_id, episode, worker, **kwargs):
        if agent_id == "0":
            return "champion"
        else:
            return "challenger1"

    config = {
        "env": ExampleSwitchingEnv,
        "callbacks": PolicySwitchingCallBack,
        "env_config": {
            "mode": STARTING_MODE,
        },
        "num_sgd_iter": 10,
        "num_workers": 10,
        "multiagent": {
            "policies": policies,
            "policy_mapping_fn": policy_mapping_fn,
            "policies_to_train": [get_string_name(0)],
            "policy_map_cache": "/opt/project/temp_cache",
            "policy_map_capacity": 2
        }
        # "verbose": Verbosity.V0_MINIMAL
    }
    ray.init(local_mode=False)
    results = tune.run("PPO", config=config)

Anything else

No response

Are you willing to submit a PR?

heinerb commented 2 years ago

@sven1977 Adding a little more detail.

The following image show single champion: image

Each Worker of which there is 60 has a RLLIB PolicyMap which is configured to cache 10 Policies in memory. Once capacity is reached the oldest polices are mapped to disk and loaded next time used.

Note: In a case of cache capacity for policy of 10 we will transition items to and from disk lowering throughput but preserving memory at reasonable values,

Going back to the PolicyMap - this is at the worker level as noted above and not at the node level leaving us with multiple duplications of the above noted policies per worker.

Manageable for single champion and can increase past 60 but limits will clearly be reached... Multi Agent Training has similar constraints but are less likely to experience in simple agent on agent setups. NvN agent training would assume to hit similar limits as champion vs multiple challenger policies

Let's show an example for small league where there is multiple champions.

image

This setup extends and run six champions which has a mix of Main-Agents and Main-Exploiters (with various selection stratigies).

So the possible combination of policies in a given worker are:

Now in any given Match there will be only 1 champion + 8 Challenger policies used... Which can be handled by the policy map. So we can cache items which helps at cost of the throughput but sure that allows us to run. However, in the example above we get to less than 10% memory free on nodes which can cause stability concerns in ray/rllib. (Note: Ray will post a warning at every report print to console for this)

image

looking to see if there is an implementation detail that I am missing here for how to implement large groups of varying policies (act, obs, model - optionally include rewards and dones) for setup.

gjoliver commented 2 years ago

Thanks for this long post. Great info. Policy map currently is per rollout worker. I guess you are proposal to make it per Ray worker node?? My wild guess is that while it will certainly ease pressure on memory, but may result in performance issues and code complexity. The work described in this post is awesome. I wonder if we should add your study as an example, so your knowledge and tuning of hundreds of policies is shared with the community.

heinerb commented 1 year ago

Thanks for this long post. Great info. Policy map currently is per rollout worker. I guess you are proposal to make it per Ray worker node?? My wild guess is that while it will certainly ease pressure on memory, but may result in performance issues and code complexity. The work described in this post is awesome. I wonder if we should add your study as an example, so your knowledge and tuning of hundreds of policies is shared with the community.

Happy to have this added for example.