ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.97k stars 5.77k forks source link

[RLlib] Memory leaks during RLlib training. #8469

Closed tinymindxxx closed 1 year ago

tinymindxxx commented 4 years ago

What is the problem?

Ray version and other system information (Python version, TensorFlow version, OS): OS: docker on centos ray:0.8.4 python:3.6

Reproduction (REQUIRED)

Please provide a script that can be run to reproduce the issue. The script should have no external library dependencies (i.e., use fake or mock data / environments):

If we cannot run your script, we cannot fix your issue.

Recently, we found our RL model trained by rllib will deplete memory and throw OOM error. Then I run a rllib DQN model as belows, the memory usage grows as time pass by.

rllib train --run=DQN --env=Breakout-v0 --config='{"output": "dqn_breakout_1M/", "output_max_file_size": 50000000,"num_workers":3}' --stop='{"timesteps_total": 1000000}' 

Memory grows as time goes on:

image

Hope someone can give some help.

ericl commented 4 years ago

Does it still happen if you set the buffer size really small, or don't use the output option?

tinymindxxx commented 4 years ago

@ericl sorry for not replying this question for so long, I am busy with other work recently. After a few trials, I found rollout worker may be the root cause of memory leak. this scripts only remove "num_workers":3 in the config, and without rollout worker there is no sign of memory leak after running for a while.

rllib train --run=DQN --env=Breakout-v0 --config='{"output": "dqn_breakout_100M", "output_max_file_size": 50000000 }' --stop='{"timesteps_total": 1000000}' 

image

Besides, We found during the offline data training process, when we set num_workers >1 memory leak occurs and no sign of memory leak with num_workers=0 .

So I guess there may be some bug in rollout worker. I will try to locate it more precisely.

ericl commented 4 years ago

Oh, that might be because setting num workers enables distributed mode. We've fixed some memory leaks in 0.8.5, so it's worth upgrading to see.

wullli commented 4 years ago

I experience the same problem with APEX-DQN running in local mode with multiple workers. Memory usage linearly rises, and the experiments fail with RayOutOfMemoryError at some point.

I have tried setting the buffer_size to a smaller value, though I did not figure out what exactly the number is supposed to mean even after some invesitgation in the docs (is it # samples or bytes?) and it did not stop the memory error.

The traceback shows RolloutWorker occupying 56 of 64 GB. Feels like a memory leak to me.

Running on 0.8.5

alpteko commented 4 years ago

I have also the same issue. I have tried to decrease buffer size, but the memory is still growing. Can you please suggest a solution if you solve this problem?

jackbart94 commented 3 years ago

Did anyone manage to find out the memory leak on RolloutWorker? This issue is still present in latest ray.

cfusting commented 3 years ago

+1 running PPO.

BFAnas commented 3 years ago
jackvice commented 3 years ago

+1 running SAC

holinov commented 3 years ago

+1 running SAC UPD: this happens only on Windows. Same train configuration on linux does-not leak

alvaro-serra commented 3 years ago

+1 running PPO

perara commented 3 years ago

@ericl, Are you certain this is not a rllib bug? I've attached a minimal code that flows over in a matter of minutes on my system. In the start, I suspected the LocalReplayBuffer to be at fault, but after stripping quite a lot away from my code it's still spilling memory. I cannot rule out that I'm doing something horribly wrong in my setup (someone with a clue should comment on that), but it is at least not straightforward on how to avoid this if it's not a relatively serious bug.

2021-03-19_08-20

import copy
import gym
from typing import List
import numpy as np
import ray
from ray import tune
from ray.rllib import SampleBatch
from ray.rllib.agents import with_common_config
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.execution import ParallelRollouts, TrainOneStep, \
    StandardMetricsReporting, SelectExperiences, ConcatBatches
from ray.rllib.models import ModelCatalog, ModelV2
from ray.rllib.models.tf import TFModelV2
from ray.rllib.models.tf.tf_action_dist import Deterministic
from ray.rllib.policy import build_tf_policy
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.view_requirement import ViewRequirement
from ray.rllib.utils import override, try_import_tf
from ray.rllib.utils.typing import TrainerConfigDict, TensorType, ModelConfigDict

tf1, tf, tfv = try_import_tf()

class MyFancyModel(TFModelV2):

    def __init__(self, obs_space: gym.spaces.Space, action_space: gym.spaces.Space, num_outputs: int,
                 model_config: ModelConfigDict, name: str, config: dict, **customized_model_kwargs):
        super().__init__(obs_space, action_space, num_outputs, model_config, name)
        self.config = config
        self.batch_size = config["sgd_minibatch_size"]
        self.batch_length = config["num_framestacks"]

        # Perform Atari framestacking via traj. view API.
        if self.batch_length > 1:
            self.data_format = "channels_first"
            self.traj_view_framestacking = True
        else:
            self.data_format = "channels_last"

        if self.traj_view_framestacking:
            from_ = self.batch_length - 1

            self.view_requirements[SampleBatch.OBS].shift = "-{}:0".format(from_)
            self.view_requirements[SampleBatch.OBS].shift_from = -from_
            self.view_requirements[SampleBatch.OBS].shift_to = 0
            self.view_requirements[SampleBatch.NEXT_OBS] = ViewRequirement(
                data_col=SampleBatch.OBS,
                shift="-{}:1".format(from_ - 1),
                space=self.view_requirements[SampleBatch.OBS].space,
                used_for_compute_actions=False,
            )
            self.view_requirements[SampleBatch.ACTIONS] = ViewRequirement(
                data_col=SampleBatch.ACTIONS,
                shift="-{}:1".format(from_ - 1),
                space=action_space,
                used_for_compute_actions=False,
            )

    @override(ModelV2)
    def forward(self, input_dict, state, seq_lens) -> (TensorType, List[TensorType]):
        outputs = None
        return outputs, [outputs]

    @staticmethod
    def loss_fn(policy, self, dist_class, train_batch):
        output, _ = self(train_batch)
        loss = tf.cast(0.0, dtype=tf.float32)
        return loss, []

    @staticmethod
    def action_distribution_fn(policy, self, input_dict, state_batches, seq_lens, explore, timestep, is_training):
        actions = np.array([self.action_space.sample() for _ in input_dict["obs"]])
        return actions, Deterministic, []

    @staticmethod
    def make_model(policy: Policy,
                   obs_space: gym.spaces.Space,
                   action_space: gym.spaces.Space,
                   config: TrainerConfigDict):

        policy.model = ModelCatalog.get_model_v2(
            obs_space=obs_space,
            action_space=action_space,
            num_outputs=1,
            model_config=config["model"],
            framework=config["framework"],
            config=config
        )
        policy.model_variables = policy.model.variables()
        return policy.model

def execution_plan_ppo(workers: WorkerSet, config: TrainerConfigDict):
    rollouts = ParallelRollouts(workers, mode="bulk_sync")

    # Collect large batches of relevant experiences & standardize.
    rollouts = rollouts.for_each(
        SelectExperiences(workers.trainable_policies()))
    rollouts = rollouts.combine(
        ConcatBatches(min_batch_size=config["train_batch_size"]))

    train_op = rollouts.for_each(
        TrainOneStep(
            workers,
            num_sgd_iter=config["num_sgd_iter"],
            sgd_minibatch_size=config["sgd_minibatch_size"]
        )
    )

    return StandardMetricsReporting(train_op, workers, config)

# https://docs.ray.io/en/master/rllib-training.html
DEFAULT_CONFIG = with_common_config({
    # "sgd_minibatch_size": 32,
    # "rollout_fragment_length": 32,
    # "log_level": "debug",
    "env": "CarRacing-v0",
    "num_workers": 0,

    # "num_iters": 20000,
    "learning_starts": 100,
    "buffer_size": 10000,

    "sample_batch_size": 32,
    "train_batch_size": 1024,

    "num_framestacks": 16,
    "framestack": -1,
    "sgd_minibatch_size": 32,
    "num_sgd_iter": 1,

    # Env Settings
    # "observation_filter": NormalizeObservation,
    # "render_env": False,
    # "monitor": True,

    "batch_mode": "complete_episodes",
    "model": {
        "custom_model": "MyFancyModel",
        "custom_model_config": {}
    },

    # Tensorflow Settings
    "framework": "tf2",
    "eager": False,
    "eager_tracing": True,
    "tf_session_args": {
        "gpu_options": {
            "allow_growth": True,
        },
        "allow_soft_placement": True,  # required by PPO multi-gpu
    },
})

MyFancyPolicy = build_tf_policy(
    "MyFancyPolicy",
    loss_fn=MyFancyModel.loss_fn,
    make_model=MyFancyModel.make_model,
    action_distribution_fn=MyFancyModel.action_distribution_fn,
    # obs_include_prev_action_reward=False
)

MyTrainer = build_trainer(
    name="MyTrainer",
    get_policy_class=lambda config: MyFancyPolicy,
    default_policy=MyFancyPolicy,
    default_config=DEFAULT_CONFIG,
    execution_plan=execution_plan_ppo

)

if __name__ == "__main__":
    ModelCatalog.register_custom_model(
        "MyFancyModel", MyFancyModel
    )

    config = copy.deepcopy(DEFAULT_CONFIG)
    config.update({
        "num_gpus": 1,
        "num_workers": 5,
        "num_envs_per_worker": 1,
        "env": "CarRacing-v0",
        "env_config": {},  # config to pass to env class
        "eager": False
    })
    ray.init(num_cpus=12, num_gpus=1)
    tune.run(
        MyTrainer,
        config=config,
        verbose=2,
        reuse_actors=True
    )
ericl commented 3 years ago

@sven1977 could you take a look here? Btw what TF version are you using? There are definitely TF versions that leak memory (<2.1 iirc).

perara commented 3 years ago

@eric-jj I'm running on 2.4.1. I have, however not encountered these leaks in my own (non-distributed) training loop

sven1977 commented 3 years ago

It does seem to only happen in non local mode. Also, when I switch back batch_mode to truncate_episodes, there is also no leaking (even in non-local mode). ...

ericl commented 3 years ago

[closing as stale]

Mingrui-Yu commented 3 years ago

Is this problem solved? I think I meet the same problem when running SAC with 'num_worker'>1.

chazzmoney commented 2 years ago

Supposedly this has been fixed via #15815. However, I still see memory leaks when running multi-worker training.

sven1977 commented 2 years ago

We have a PR in review that adds memory-leak tests to all major RLlib algos. We are currently testing it to make sure it doesn't detect false positives, but the tool is pretty good at finding leaks, however small.

https://github.com/ray-project/ray/pull/15412

jjyyxx commented 2 years ago

@sven1977 Thanks for your great work! The PR https://github.com/ray-project/ray/pull/15412 indeed fixed something. I was recently troubled by memory leaks with RLlib when using "tf2" framework. "tf" works fine, but "tf2" is much easier to work with when developing and experimenting with new algorithms. Screenshot 2022-04-16 225230

https://github.com/ray-project/ray/pull/15412 is merged 4 days ago, after v1.12 release. I wonder when a new version would be released?

ArturNiederfahrenhorst commented 1 year ago

Sven's PR has been merged and we have been running memory leak tests for a while now and since this thread has become inactive, I assume that this is not an issue anymore.

rmalone1097 commented 1 year ago

Still having this issue with DQN in 2.3.0

Mark2000 commented 1 year ago

Likewise having this issue with APPO on 2.5.1 and 2.6.3

ArturNiederfahrenhorst commented 1 year ago

@rmalone1097 @Mark2000 are you saying you are experiencing memory leaks in general? Can you craft reproduction script that we can copy/paste and run?

Mark2000 commented 1 year ago

I'll see if I can find time to put that together, currently just using a higher tier on our computing cluster that grants more memory as a workaround. For some context, here's what I asked on the ray slack:

I’m currently experiencing a memory leak using APPO with the tf2 backend and tune (ray 2.5.1 and 2.6.3). To ensure it’s not my gym environment leaking memory on reset, I’ve added base_env.vector_env.restart_at(0) in the on_episode_end callback, which causes the environment to be deleted and recreated (making training slower relative to the wall clock, but memory leaks at roughly the same rate per step). Running out of memory on a cluster with 3.8gb/worker after 1.5M-5M steps.

Here are the params passed to APPO:

  clip_param: 0.2
  entropy_coeff: 0.0
  gamma: 0.999
  grad_clip: 0.5
  lambda_: 0.95
  learner_queue_timeout: 900
  lr: 0.0003
  minibatch_size: 250
  model:
    fcnet_hiddens:
    - 2048
    - 2048
    vf_share_layers: false
  num_sgd_iter: 10
  train_batch_size: 10000
  use_gae: true
  vtrace: false

The only other modification I'm making is advantage normalization with a callback:

    def on_postprocess_trajectory(...):
        if "advantages" in postprocessed_batch:
            adv = postprocessed_batch["advantages"]
            postprocessed_batch["advantages"] = (adv - np.mean(adv)) / (
                np.std(adv) + 1e-8
            )
jesuspc commented 1 year ago

Seeing memory leaks in TD3 as well...

Screenshot 2023-10-24 at 07 49 57

The drop corresponds to a process restore from checkpoint.

binakn commented 1 year ago

I also see a memory leak during RLlib training, using SAC in combination with PettingZoo MPE.

Even for longer runs of over 5 days the memory keeps growing linearly, adding over 70GB on top of the already quite great memory consumption right from the beginning.

Version Details for my Docker Image:

Reproduction Script for Minimal Example 1):

import os 

import ray 
from ray import air
from ray.rllib.algorithms.sac import SAC, SACConfig

from ray.rllib.env.wrappers.pettingzoo_env import PettingZooEnv
from pettingzoo.mpe import simple_v2

from ray.tune.registry import register_env

# determine checkpoint directory
Dir = os.getcwd() 
checkpoint_dir = os.path.join(Dir, 'ray_results')

#### Environment Setup
def env_creator(env_config): 
        env = PettingZooEnv(simple_v2.env(**env_config))
        return env

register_env("simple_v2", env_creator)

#### Initialize Ray
ray.init(local_mode=False, num_gpus=1, logging_level="info")

# hyperparameter search 
config = (SACConfig()
            .training(replay_buffer_config = {
                          "capacity": int(1e6) 
                      },
            )
            .rollouts(num_rollout_workers=4) 
            .resources(num_gpus=1)
            .environment(env="simple_v2")
            .framework('torch')
)

# ``Tuner.fit()`` allows setting a custom log directory (other than ``~/ray-results``)
tuner = ray.tune.Tuner(
    SAC,
    param_space=config,
    run_config=air.RunConfig(
        stop={"training_iteration": 500},
        checkpoint_config=air.CheckpointConfig(checkpoint_at_end=True,
                                               checkpoint_frequency=300),
        local_dir = checkpoint_dir
    ),
)

# run the experiment trial until the stopping criteria is met
results = tuner.fit() 

Grafana showing the rss memory usage going from 46.9 GB to 53.4 GB, i.e. having an increase of 6.5 GB. image

I also tried to reduce the number of workers or the replay memory size, but this didn't make the problem go away: 2) num_rollout_workers=0 instead of 4 ==> Increase of 1.2 GB for the single local worker. 3) capacity: int(10) instead of the default of int(1e6) ==> Increase of 1.6 GB over local & remote workers

The order of the execution for the chart was 1) -> 2) -> 3) with starting times of 14:06, 14:38 and 16:17 respectively. See the full Grafana chart below: Even with the pretty small replay buffer, the memory is still increasing significantly.

image

Any ideas what is happening here?

Mark2000 commented 1 year ago

I’ve narrowed down my issues to a leaky environment (or at least, my environment leaks enough to probably be more significant than the algorithm). @jesuspc how did you trigger the process restore from checkpoint? Would be a good workaround for me until I’m able to track down my environment leak.

jesuspc commented 1 year ago

@Mark2000 I restore a stopped tune run with:

tune.run(
  "TD3", # same as the original run
  name="<folder_name_containing_the_trial_data>", # for instance TD3_2023-10-23_18-17-45 
  local_dir="<absolute_path_to_the_local_dir_of_the_original_run>", 
  resume=True
)

You can also use resume="ERRORED_ONLY" instead if you need to restart failed instances only. In my experience the restored trial works well in some cases but in others the restored trial behaves very differently from the original one. See for instance the following plot for a trial that was restored at ~600k steps after which the reward curve displays a very different profile.

Screenshot 2023-10-25 at 20 00 24

Using Ray 2.7.1 + torch 2.1.0+cu118 on an Ubuntu 22.04 system.