ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.14k stars 5.8k forks source link

[rllib] Slow convergence time compared to other libraries #8088

Closed matej-macak closed 4 years ago

matej-macak commented 4 years ago

The RLLib converges slowly on a simple environment compared to comparable algorithms with different libraries under same conditions (see below the results). Is this something that is expected or is there an approach that I overlooked when running the program?

The environment itself is trivial (reward is simply the sum of all the continuous actions) yet the RLlib algorithm takes a fairly long time to converge. I have tried different things like playing with learning rate and other parameters but none of them actually changed the very gradual and slow learning process.

image

Ubuntu 18.04
Python == 3.7.7
Ray version == 0.8.4
Tensorflow version == 1.15.0
PyTorch version == 1.4.0

System information:
i7-8750H (6 physical cores)
Nvidia Geforce 1050Ti (CUDA 10.1)

Steps to reproduce the issue:

RLlib code

import numpy as np
import pandas as pd
from csv import writer
import itertools

import gym
from gym import spaces

import ray
from ray.tune.registry import register_env
import ray.rllib.agents.ppo as ppo
import ray.rllib.agents.impala as impala
from ray.tune.logger import pretty_print

class TestEnv(gym.Env):

    def __init__(self, dimensions = 1000, observations = 100):

        self.action_space = spaces.Box(low = 0, high = 1, shape = (dimensions, ))
        self.observation_space = spaces.Box(low = 0, high = 1, shape = (observations, ))

        self.obs = np.random.rand(observations)

    def step(self, action):

        reward = np.sum(action)
        observation = self.obs
        done = True

        return observation, reward, done, {}

    def reset(self):

        observation = self.obs

        return observation

# Base settings
num_steps = 100
save_paths = {"impala": "./logs/impala_evaluations", 
              "ppo": "./logs/ppo_evaluations"}

# Initialisation
ray.init()
register_env("TestEnv", lambda config: TestEnv()) #register custom environment 

for algorithm in ["ppo","impala"]:

    # Initialise algos
    if algorithm == "impala":
        config = impala.DEFAULT_CONFIG.copy()
    elif algorithm == "ppo":
        config = ppo.DEFAULT_CONFIG.copy()

    # Setup config to be similar to the stable baselines
    config["num_gpus"] = 1
    config["num_workers"] = 6
    config["model"]["fcnet_hiddens"] = [64,64]

    if algorithm == "impala":
        trainer = impala.ImpalaTrainer(config=config, env="TestEnv")
    elif algorithm == "ppo":
        config["use_pytorch"] = True # Cannot align GPU
        trainer = ppo.PPOTrainer(config=config, env="TestEnv")

    first_save = True
    log_path = save_paths[algorithm]

    for i in range(num_steps):
        # Perform one iteration of training the policy with PPO
        result = trainer.train()

        # Output results
        print("Step: {}, Reward {}, Max reward: {}, Timesteps: {}, Time since start: {}".format(i, 
                                                                                                result["episode_reward_mean"],
                                                                                                result["episode_reward_max"],
                                                                                                result["timesteps_total"],
                                                                                                result["time_total_s"]))

        # Write csv
        csv_rows = [result["timesteps_total"], result["episode_reward_mean"], result["time_total_s"]]
        if first_save:
            first_save = False
            with open("{}.csv".format(log_path), "w") as fp:
                csv_writer = writer(fp)
                csv_writer.writerow(["Timesteps", "Mean_reward", "Time"])
                csv_writer.writerow(csv_rows)
        else:
            with open("{}.csv".format(log_path), "a+") as fp:
                csv_writer = writer(fp)
                csv_writer.writerow(csv_rows)

        if i % 100 == 0:
            checkpoint = trainer.save()
            print("checkpoint saved at", checkpoint)

Stable baselines code

import gym
from gym import spaces

from stable_baselines import DQN, PPO2
from stable_baselines.common.evaluation import evaluate_policy
from stable_baselines.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines.common import set_global_seeds

# EvalCallback requirements
import numpy as np
import os
import warnings
from stable_baselines.common.callbacks import EventCallback, BaseCallback
from typing import Union, List, Dict, Any, Optional
from stable_baselines.common.vec_env import VecEnv, sync_envs_normalization, DummyVecEnv
import time
from csv import writer

from environments import ProductionEnvironment

class EvalCallback(EventCallback):
    """
    Callback for evaluating an agent.
    :param eval_env: (Union[gym.Env, VecEnv]) The environment used for initialization
    :param callback_on_new_best: (Optional[BaseCallback]) Callback to trigger
        when there is a new best model according to the `mean_reward`
    :param n_eval_episodes: (int) The number of episodes to test the agent
    :param eval_freq: (int) Evaluate the agent every eval_freq call of the callback.
    :param log_path: (str) Path to a folder where the evaluations (`evaluations.npz`)
        will be saved. It will be updated at each evaluation.
    :param best_model_save_path: (str) Path to a folder where the best model
        according to performance on the eval env will be saved.
    :param deterministic: (bool) Whether the evaluation should
        use a stochastic or deterministic actions.
    :param render: (bool) Whether to render or not the environment during evaluation
    :param verbose: (int)
    """
    def __init__(self, eval_env: Union[gym.Env, VecEnv],
                 callback_on_new_best: Optional[BaseCallback] = None,
                 n_eval_episodes: int = 5,
                 eval_freq: int = 10000,
                 log_path: str = None,
                 best_model_save_path: str = None,
                 deterministic: bool = True,
                 render: bool = False,
                 verbose: int = 1):
        super(EvalCallback, self).__init__(callback_on_new_best, verbose=verbose)
        self.n_eval_episodes = n_eval_episodes
        self.eval_freq = eval_freq
        self.best_mean_reward = -np.inf
        self.last_mean_reward = -np.inf
        self.deterministic = deterministic
        self.render = render

        self.start_time = time.time()
        self.results = []
        self.first_save = True

        # Convert to VecEnv for consistency
        if not isinstance(eval_env, VecEnv):
            eval_env = DummyVecEnv([lambda: eval_env])

        assert eval_env.num_envs == 1, "You must pass only one environment for evaluation"

        self.eval_env = eval_env
        self.best_model_save_path = best_model_save_path
        # Logs will be written in `evaluations.npz`
        if log_path is not None:
            log_path = os.path.join(log_path, 'evaluations')
        self.log_path = log_path
        self.evaluations_results = []
        self.evaluations_timesteps = []
        self.evaluations_length = []

    def _init_callback(self):
        # Does not work in some corner cases, where the wrapper is not the same
        if not type(self.training_env) is type(self.eval_env):
            warnings.warn("Training and eval env are not of the same type"
                          "{} != {}".format(self.training_env, self.eval_env))

        # Create folders if needed
        if self.best_model_save_path is not None:
            os.makedirs(self.best_model_save_path, exist_ok=True)
        if self.log_path is not None:
            os.makedirs(os.path.dirname(self.log_path), exist_ok=True)

    def _on_step(self) -> bool:

        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            # Sync training and eval env if there is VecNormalize
            sync_envs_normalization(self.training_env, self.eval_env)

            episode_rewards, episode_lengths = evaluate_policy(self.model, self.eval_env,
                                                               n_eval_episodes=self.n_eval_episodes,
                                                               render=self.render,
                                                               deterministic=self.deterministic,
                                                               return_episode_rewards=True)

            if self.log_path is not None:
                self.evaluations_timesteps.append(self.num_timesteps)
                self.evaluations_results.append(episode_rewards)
                self.evaluations_length.append(episode_lengths)
                np.savez(self.log_path, timesteps=self.evaluations_timesteps,
                         results=self.evaluations_results, ep_lengths=self.evaluations_length)

            mean_reward, std_reward = np.mean(episode_rewards), np.std(episode_rewards)
            mean_ep_length, std_ep_length = np.mean(episode_lengths), np.std(episode_lengths)
            # Keep track of the last evaluation, useful for classes that derive from this callback
            self.last_mean_reward = mean_reward

            # Write to csv file
            time_since_start = time.time() - self.start_time
            csv_rows = [self.num_timesteps, mean_reward, std_reward, time_since_start]
            if self.first_save:
                self.first_save = False
                with open("{}.csv".format(self.log_path), "w") as fp:
                    csv_writer = writer(fp)
                    csv_writer.writerow(["Timesteps", "Mean_reward", "Std_reward", "Time"])
                    csv_writer.writerow(csv_rows)
            else:
                with open("{}.csv".format(self.log_path), "a+") as fp:
                    csv_writer = writer(fp)
                    csv_writer.writerow(csv_rows)

            if self.verbose > 0:
                print("Eval num_timesteps={}, "
                      "episode_reward={:.2f} +/- {:.2f}, "
                      "Episode length: {:.2f} +/- {:.2f}, "
                      "Time since start: {:.0f}".format(self.num_timesteps, mean_reward, std_reward, mean_ep_length, std_ep_length, time.time() - self.start_time))

            if mean_reward > self.best_mean_reward:
                if self.verbose > 0:
                    print("New best mean reward!")
                if self.best_model_save_path is not None:
                    self.model.save(os.path.join(self.best_model_save_path, 'best_model'))
                self.best_mean_reward = mean_reward
                # Trigger callback if needed
                if self.callback is not None:
                    return self._on_event()

        return True

def make_env(base_env):
    """
    Utility function for multiprocessed env.

    :param env_id: (str) the environment ID
    :param num_env: (int) the number of environment you wish to have in subprocesses
    :param seed: (int) the inital seed for RNG
    :param rank: (int) index of the subprocess
    """
    def _init():
        env = base_env()
        return env
    return _init

class TestEnv(gym.Env):

    def __init__(self, dimensions = 1000, observations = 100):

        self.action_space = spaces.Box(low = 0, high = 1, shape = (dimensions, ))
        self.observation_space = spaces.Box(low = 0, high = 1, shape = (observations, ))

        self.obs = np.random.rand(observations)

    def step(self, action):

        reward = np.sum(action)

        observation = self.obs

        done = True

        return observation, reward, done, {}

    def reset(self):

        observation = self.obs

        return observation

if __name__ == "__main__":

    num_cpu = 6

    # Create environment
    env = SubprocVecEnv([make_env(TestEnv) for i in range(num_cpu)])

    # Instantiate the agent
    model = PPO2('MlpPolicy', env, verbose=0)

    # Evaluate the agent
    eval_env = TestEnv()
    eval_callback = EvalCallback(eval_env, best_model_save_path='./logs/',
                             log_path='./logs/', eval_freq=1000,
                             deterministic=True, render=False, verbose = True)

    model.learn(total_timesteps=int(1e8), callback=eval_callback)
regproj commented 4 years ago

I can't answer your question, but could you try plotting by number of timesteps on the x-axis? Maybe sampling in ray is slower than it is in other libraries

5cp commented 4 years ago

Also - try reducing "num_sgd_iter" for PPO (the default is 30 which seems high), and try PPO with Tensorflow..

sven1977 commented 4 years ago

@matej-macak thanks for filing this! It seems to be simply default hyperparam related. 1) Could you try setting the kl_coeff in your config to 0.0? 2) Also, it's probably better to use no gamma at all since it is a context-less env (set gamma to 0.0).

Please let us know, whether this works.

matej-macak commented 4 years ago

Hi all,

thank you very much - really amazing community for such a quick set of answers. Here are the tests that I have run:

Test 1 - suggested by @sven1977

In this test, I have set the kl_coeff to 0.0 and gamma to 0.0. Agreed that this is a context-less environment but wanted to use like for like (as the stable-baselines. Definitely helped with convergence although still a bit slower. This got me going on a good trajectory though, I could probably use tune to find a set of coefficients that could increase this even further. Is there any reason why the

Test 2 - suggested by @5cp

I set the num_sgd_iter to 15 (not sure what the right one should be in this case).

image

@regproj - Yes I have noticed that the timesteps are lower than in the stable-baselines case but what I am interested in is the wall time not the timestep efficiency (i.e. for the same level of resources what is the speed I want to get).

Overall, I find ray amazing and it definitely outperforms stable-baselines on many atari benchmarks I have tested. As the problem I am trying to solve is more similar to the TestEnv class, I wanted to solve this toy example before deploying cluster using this.

sven1977 commented 4 years ago

Hey @matej-macak , actually, yeah, I noticed too a very slow learning convergence having to do with the (continuous) action space being bounded for PPO. In the case of bounded cont. action, we simply clip the output actions before sending them to the env. I'll look into this further.

regproj commented 4 years ago

Does the action distribution for ppo (or ddpg, sac, etc.) start off as a unit gaussian? If so, would it be better for convergence if we set the environment action bounds to something like -1,1 and rescaled it inside the environment?

matej-macak commented 4 years ago

I have noticed that part of the slowness can be explained with the parameters tuning (i.e. the system is very sensitive to train_batch_size, num_sgd_iter, rollout_fragment_length and sgd_minibatch_size. I am assuming that given these are deterministic and one step environments it is better to not have a final update batch size larger than the number of workers as the batch probably contains a high number of repetitive actions and steps which leads to slower training.

Lowering these parameters, however, leads to a quicker memory leak and crash which I reported here in #8473

sven1977 commented 4 years ago

I think I found a quite satisfying hyperparam solution, which converges quite fast now (within <1min). There will be a PR today that also gets rid of the Box-limit problem in our PPO (Box(0.0, 1.0) learns ok, but e.g. Box(1.0, 3.0) doesn't). ...

sven1977 commented 4 years ago

Here is the config, that works quite well on my end now. It basically simplifies everything a lot. But then again, it's also a very simple env.

    config = {
        "num_workers": 0,
        "entropy_coeff": 0.00001,
        "num_sgd_iter": 4,
        "vf_loss_coeff": 0.0,
        #"vf_clip_param": 100.0,
        #"grad_clip": 1.0,
        "lr": 0.0005,
        # State doesn't matter -> Set both gamma and lambda to 0.0.
        "lambda": 0.0,
        "gamma": 0.0,
        "clip_param": 0.1,
        "kl_coeff": 0.0,
        "train_batch_size": 64,
        "sgd_minibatch_size": 16,
        "normalize_actions": True,
        "clip_actions": False,
        # Use a very simple Model for faster convergence.
        "model": {
            "fcnet_hiddens": [8],
        },
        "use_pytorch": [True/False],
    }

Closing this issue.

sven1977 commented 4 years ago

On another note: It could also be that baselines treats the action space as type=int ... in which case it would be easier to reach the 1000 reward, b/c you only had two choices (0 and 1) for each action component.

matej-macak commented 4 years ago

Hi @sven1977, the action_space is of type==float. I have checked the action vector in the step and it is not producing binary results. I think the hyperparameter search definitely helps. I have been working on a similar, more complex, problem that inspired this question and my finding is that rllib seems to have a more stable but slower convergence than stable baselines that is faster but can get stuck in local minima a bit more easily.

AJSVB commented 2 years ago

I was reading this issue and wondered if there was information on the Rllib website about choosing the HPs. I don't think I found on the documentation that if the state doesn't matter we can set both gamma and lambda to 0.0.