DLR-RM / stable-baselines3

PyTorch version of Stable Baselines, reliable implementations of reinforcement learning algorithms.
https://stable-baselines3.readthedocs.io
MIT License
8.69k stars 1.65k forks source link

Deadlock when running model.learn on a SubprocVecEnv #1814

Open 1-Bart-1 opened 7 months ago

1-Bart-1 commented 7 months ago

🐛 Bug

When running model.learn on a SubprocVecEnv as follows:

env = make_vec_env(ENV_ID, n_envs=cpus, vec_env_cls=SubprocVecEnv, vec_env_kwargs=dict(start_method="spawn")) model = SAC(**kwargs, env=env) model.learn(N_TIMESTEPS, callback=eval_callback)

the program ends up in a deadlock. This is likely because I am using a custom environment which is running julia code by using juliacall. The solution is to change CloudpickleWrapper to a DillWrapper as follows:

in stable_baselines3.common.vec_env.base_vec_env.py ` import dill class DillWrapper: def init(self, var: Any): self.var = var

def __getstate__(self) -> Any:
    return dill.dumps(self.var)

def __setstate__(self, var: Any) -> None:
    self.var = dill.loads(var)

`

And use DillWrapper instead of CloudpickleWrapper in stable_baselines3.common.vec_env.subproc_vec_env.py By doing this, the problem seems to be less often, but not completely solved.

I am running the code with the following command: apptainer exec \ --env PYTHON_JULIACALL_SYSIMAGE=/cluster/home/bartva/BOAT/Simulation/Kite/trainer/.JlEnv.so \ --bind /cluster/work/bartva \ --nv \ .kite-app.sif python -u BOAT/Simulation/Kite/trainer/hyperparam-tuning.py \ --trials 500 \ --startup_trials 5 \ --evaluations 5 \ --steps 200000 \ --episodes 10 \ --cpus 0 \ --verbose 2 \

Code example

KiteEnv.py:

import os
from shutil import copy, rmtree
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import random
import yaml
from uuid import uuid4
from .utils import get_args

args = get_args()

curdir = os.path.dirname(__file__)
path = os.path.join(curdir, "../.JlEnv.so")
main_data_dir = "/cluster/work/bartva/data"

class KiteEnv(gym.Env):

  def __init__(self):        
    self.action_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)
            # range:
            #   depower: [0, 1], steering [-1, 1]
            # conversion: 
    self.observation_space = spaces.Box(low=-1, high=1, shape=(9,), dtype=np.float32)
            # [w_old, x_old, y_old, z_old, w, x, y, z, force]

    from juliacall import Main as jl  
    jl.seval("using Pkg")
    jl.Pkg.activate(os.path.join(curdir, "../Environment"))
    jl.seval("using Environment") # this is a custom package i built from Environment.jl

    self.Environment = jl.Environment

    self.data_dir = os.path.join(main_data_dir, str(uuid4()))
    os.makedirs(self.data_dir)
    copy(os.path.join(main_data_dir, "system.yaml"), 
                os.path.join(self.data_dir, "system.yaml"))
    copy(os.path.join(main_data_dir, "settings.yaml"), 
                os.path.join(self.data_dir, "settings.yaml"))
    print("Made directory: ", self.data_dir)
    self.Environment.set_data_path(self.data_dir)

    self.verbose = args.verbose
    self.no_random = args.no_random
    self.render_mode = None  

  def _get_real_action(self, action):
    np.squeeze(action)
    power = action[0] / 2 + 0.5
    steering = action[1]
    return np.array([power, steering])

  def _normalize_obs(self, observation):
    np.squeeze(observation)
    max_force = 4000
    normalized_force = 2*observation[8]/max_force - 1
    return np.append(observation[0:8], normalized_force)

  def reset(self, seed=None, options=None):
    super().reset(seed=seed)

    reset_complete = False
    reset_value = None
    while reset_complete == False:
      try:
        with open(os.path.join(main_data_dir, "settings.yaml"), 'r') as file:
          settings = yaml.safe_load(file)

        if (not self.no_random):
          settings['initial']['elevation'] = random.uniform(65, 70)

        with open(os.path.join(self.data_dir, "settings.yaml"), 'w') as file:
          yaml.safe_dump(settings, file)

        reset_value = self.Environment.reset()[:4]
        reset_complete = True
      except Exception as e:
        print(e)
        print(self.observation)
        reset_complete = False

    self.cumulative_force = 0.0
    self.observation = self._normalize_obs(
      np.append(
        reset_value[:4],
        self.Environment.get_next_step(0.5,0.0)
      )
    )
    # self.step_count = 0
    return np.array(self.observation, dtype=np.float32), {}

  def step(self, action):
    action = self._get_real_action(action)
    # self.step_count += 1
    # done = self.step_count >= self.steps_per_episode
    done = False

    reward = 0.0
    try:
      self.observation = self._normalize_obs(
        np.append(
        self.observation[4:8],
        self.Environment.get_next_step(float(action[0]), float(action[1]))
        )
      )
      force = self.observation[8]
      reward = force
    except Exception as e:
      if(self.verbose >= 3):
        print("ERROR")
        print(e)
        print(float(action[0]),  float(action[1]))
      done = True
      reward = -10

    return np.array(self.observation, dtype=np.float32), reward, done, False, {}

  def close(self):
    try:
      rmtree(self.data_dir)
      print("Removed directory: ", self.data_dir)
    except Exception as e:
      print(e)

hyperparam-tuning.py:

# from multiprocessing import set_start_method, get_start_method
import optuna
# from dask.distributed import Client
# from dask.distributed import wait
from typing import Any
from typing import Dict
import time
import psutil
import gymnasium as gym
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import EvalCallback

from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env

from Components.KiteEnv import KiteEnv
from Components import utils
from multiprocessing import cpu_count

args = utils.get_args()
if args.cpus == 0:
    cpus = cpu_count()
else:
    cpus = args.cpus
print("CPUs: ", cpus)

N_TRIALS = args.trials
N_STARTUP_TRIALS = args.startup_trials
N_EVALUATIONS = args.evaluations
N_TIMESTEPS = args.steps
EVAL_FREQ = int(N_TIMESTEPS / N_EVALUATIONS)
N_EVAL_EPISODES = args.episodes

ENV_ID = "KiteEnv-v0"
gym.envs.registration.register(
    id=ENV_ID,
    entry_point=KiteEnv,
)

def sample_sac_params(trial: optuna.Trial) -> Dict[str, Any]:
    """
    Sampler for SAC hyperparams.

    :param trial:
    :return:
    """
    gamma = trial.suggest_categorical("gamma", [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999])
    learning_rate = trial.suggest_float("learning_rate", 1e-6, 0.1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128, 256, 512, 1024, 2048])
    buffer_size = trial.suggest_categorical("buffer_size", [int(1e4), int(1e5), int(1e6)])
    learning_starts = trial.suggest_categorical("learning_starts", [0, 1000, 10000, 20000])
    # train_freq = trial.suggest_categorical('train_freq', [1, 10, 100, 300])
    train_freq = trial.suggest_categorical("train_freq", [1, 4, 8, 16, 32, 64, 128, 256, 512])
    # Polyak coeff
    tau = trial.suggest_categorical("tau", [0.001, 0.005, 0.01, 0.02, 0.05, 0.08])
    # gradient_steps takes too much time
    # gradient_steps = trial.suggest_categorical('gradient_steps', [1, 100, 300])
    gradient_steps = train_freq
    # ent_coef = trial.suggest_categorical('ent_coef', ['auto', 0.5, 0.1, 0.05, 0.01, 0.0001])
    ent_coef = "auto"
    # You can comment that out when not using gSDE
    log_std_init = trial.suggest_float("log_std_init", -4, 1)
    # NOTE: Add "verybig" to net_arch when tuning HER
    net_arch_type = trial.suggest_categorical("net_arch", ["small", "medium", "big"])
    # activation_fn = trial.suggest_categorical('activation_fn', [nn.Tanh, nn.ReLU, nn.ELU, nn.LeakyReLU])

    net_arch = {
        "small": [256, 256],
        "medium": [400, 300],
        "big": [500, 400, 300],
    }[net_arch_type]

    target_entropy = "auto"
    # if ent_coef == 'auto':
    #     # target_entropy = trial.suggest_categorical('target_entropy', ['auto', 5, 1, 0, -1, -5, -10, -20, -50])
    #     target_entropy = trial.suggest_float('target_entropy', -10, 10)

    return {
        "gamma": gamma,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "buffer_size": buffer_size,
        "learning_starts": learning_starts,
        "train_freq": train_freq,
        "gradient_steps": gradient_steps,
        "ent_coef": ent_coef,
        "tau": tau,
        "target_entropy": target_entropy,
        "policy_kwargs": dict(log_std_init=log_std_init, net_arch=net_arch),
    }

class TrialEvalCallback(EvalCallback):
    """Callback used for evaluating and reporting a trial."""

    def __init__(
        self,
        eval_env: gym.Env,
        trial: optuna.Trial,
        n_eval_episodes: int = 5,
        eval_freq: int = 10000,
        deterministic: bool = True,
        verbose: int = 0,
    ):
        super().__init__(
            eval_env=eval_env,
            n_eval_episodes=n_eval_episodes,
            eval_freq=eval_freq,
            deterministic=deterministic,
            verbose=verbose,
        )
        self.trial = trial
        self.eval_idx = 0
        self.is_pruned = False

    def _on_step(self) -> bool:
        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            super()._on_step()
            self.eval_idx += 1
            self.trial.report(self.last_mean_reward, self.eval_idx)
            # Prune trial if need.
            if self.trial.should_prune():
                self.is_pruned = True
                return False
        return True

def objective(trial: optuna.Trial, env: gym.Env, eval_env: gym.Env) -> float:
    default_hyperparams = {"policy": "MlpPolicy", "env": env}
    kwargs = default_hyperparams.copy()
    kwargs.update(sample_sac_params(trial))    
    model = SAC(**kwargs, verbose=args.verbose)

    eval_callback = TrialEvalCallback(
        eval_env, trial, n_eval_episodes=N_EVAL_EPISODES, eval_freq=int(EVAL_FREQ/cpus), deterministic=True
    )

    nan_encountered = False
    try:
        print("Starting learn...")
        model.learn(N_TIMESTEPS, callback=eval_callback)
    except (AssertionError, ValueError) as e:
        # Sometimes, random hyperparams can generate NaN.
        if(args.verbose >= 2):
            print("ERROR")
            print(e)
        nan_encountered = True

    # Tell the optimizer that the trial failed.
    if nan_encountered:
        return float("nan")

    if eval_callback.is_pruned:
        raise optuna.exceptions.TrialPruned()

    return eval_callback.last_mean_reward

if __name__ == "__main__":
    # set_start_method("spawn", force=True)

    # print("Start_method: ", get_start_method())
    # gym.envs.registration.register(
    #     id=ENV_ID,
    #     entry_point=KiteEnv,
    # )
    try:
        start_time = time.time()
        sampler = TPESampler(n_startup_trials=N_STARTUP_TRIALS)
        pruner = MedianPruner(n_startup_trials=N_STARTUP_TRIALS, n_warmup_steps=N_EVALUATIONS // 3)
        study = optuna.create_study(sampler=sampler, pruner=pruner, direction="maximize")

        env = make_vec_env(ENV_ID, n_envs=cpus, vec_env_cls=SubprocVecEnv, vec_env_kwargs=dict(start_method="spawn"))
        # env = SubprocVecEnv([make_env(ENV_ID, i) for i in range(cpus)], start_method="spawn")
        # eval_env = SubprocVecEnv([make_env(ENV_ID, i) for i in range(cpus)], start_method="spawn")
        eval_env = make_vec_env(ENV_ID, n_envs=cpus, vec_env_cls=SubprocVecEnv, vec_env_kwargs=dict(start_method="spawn"))
        try:
            print("Starting optimization...")
            study.optimize(lambda trial: objective(trial, env, eval_env), n_trials=N_TRIALS)
        except KeyboardInterrupt:
            pass

        print("Number of finished trials: ", len(study.trials))
        print("Best trial:")
        trial = study.best_trial
        print("  Value: ", trial.value)
        print("  Params: ")
        for key, value in trial.params.items():
            print("    {}: {}".format(key, value))
        print("  User attrs:")
        for key, value in trial.user_attrs.items():
            print("    {}: {}".format(key, value))
        print(f"Time used: {time.time()-start_time}")
    except Exception as e:
        print(e)
    finally:
        env.close()
        eval_env.close()

Environment.jl (Has to be built as a custom package!)

module Environment

using Timers; tic()
using KiteModels
using KiteUtils
# using PyCall #removed pycall!!
# using Plots

const Model = KPS4

set_data_path(joinpath(@__DIR__, "../../Simulator/data"))
kcu = KCU(se());
kps4 = Model(kcu);
dt = 1/se().sample_freq
steps = 1000
step = 0
logger = Logger(se().segments + 5, steps) 

GC.gc();
toc();

integrator = KiteModels.init_sim!(kps4, stiffness_factor=0.04);

function get_next_step(depower, steering)
    global step
    depower = Float32(depower)
    steering = Float32(steering)

    v_ro = 0.0

    if depower < 0.22; depower = 0.22; end
    set_depower_steering(kps4.kcu, depower, steering)

    t_sim = 0.0
    open("next_step_io.txt", "w") do io
        redirect_stdout(io) do
            t_sim = @elapsed KiteModels.next_step!(kps4, integrator, v_ro=v_ro, dt=dt)
        end
    end

    GC.gc(false)

    sys_state = SysState(kps4)
    step += 1

    return sys_state.orient[1], sys_state.orient[2], sys_state.orient[3], sys_state.orient[4], sys_state.force
end

function reset()
    global kcu
    global kps4
    global integrator
    global step
    global sys_state
    update_settings()
    save_log(logger)
    kcu = KCU(se());
    kps4 = Model(kcu);
    integrator = KiteModels.init_sim!(kps4, stiffness_factor=0.04)
    step = 0
    sys_state = SysState(kps4)
    GC.gc();
    return sys_state.orient[1], sys_state.orient[2], sys_state.orient[3], sys_state.orient[4], sys_state.force
end

function render()
    global sys_state, logger, step, steps
    if(step < steps)
        log!(logger, SysState(kps4))
    end
end

end

Relevant log output / Error message

After 1-2 trials, there is a deadlock.
When using the DillWrapper, the deadlock is after around 7 trials.
When using the "spawn" method and the DillWrapper, the problem seems to be solved.

System Info

I am running the code in an Ubuntu Apptainer on a High Performance Cluster: https://www.hpc.ntnu.no/idun/

Checklist

araffin commented 7 months ago

Hello, thanks for the bug report and proposed solution. I'm not sure if we can do much from the SB3 side as it seems to be a very specific usecase.

1-Bart-1 commented 7 months ago

You could add the option for using DillWrapper instead of CloudpickleWrapper as the wrapper for a SubprocVecEnv.

ons. 24. jan. 2024, 13:19 skrev Antonin RAFFIN @.***>:

Hello, thanks for the bug report and proposed solution. I'm not sure if we can do much from the SB3 side as it seems to be a very specific usecase.

— Reply to this email directly, view it on GitHub https://github.com/DLR-RM/stable-baselines3/issues/1814#issuecomment-1908016368, or unsubscribe https://github.com/notifications/unsubscribe-auth/AIBR55DEPWU5L44UIW75M53YQD33JAVCNFSM6AAAAABCIUXSPSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSMBYGAYTMMZWHA . You are receiving this because you authored the thread.Message ID: @.***>

araffin commented 7 months ago

Could you please run python -c 'import stable_baselines3 as sb3; sb3.get_system_info()' and also give the version of dill that was tested?

1-Bart-1 commented 7 months ago

I downloaded stable-baselines from https://github.com/DLR-RM/stable-baselines3 at master.

pip show dill | grep Version Version: 0.3.7

On Mon, Jan 29, 2024 at 1:21 PM Antonin RAFFIN @.***> wrote:

Could you please run python -c 'import stable_baselines3 as sb3; sb3.get_system_info()' and also give the version of dill that was tested?

— Reply to this email directly, view it on GitHub https://github.com/DLR-RM/stable-baselines3/issues/1814#issuecomment-1914584093, or unsubscribe https://github.com/notifications/unsubscribe-auth/AIBR55AZQMG6I2SA6NFCRD3YQ6H6NAVCNFSM6AAAAABCIUXSPSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSMJUGU4DIMBZGM . You are receiving this because you authored the thread.Message ID: @.***>