rte-france / Grid2Op

Grid2Op a testbed platform to model sequential decision making in power systems.
https://grid2op.readthedocs.io/
Mozilla Public License 2.0
281 stars 115 forks source link

grid2op.make(...) not threadsafe? #626

Open DEUCE1957 opened 2 months ago

DEUCE1957 commented 2 months ago

Environment

How to reproduce

import grid2op
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
def make_env():
    env = grid2op.make("rte_case14_realistic")
    obs = env.reset()
    print(obs)
    return obs

futures = set()
with ThreadPoolExecutor(max_workers=mp.cpu_count()-1) as executor:
    for i in range(mp.cpu_count() - 1):
        futures.add(executor.submit(make_env))
for item in futures:
    print(item)

Will usually throw a Backend error or TypeError on a subset of the function calls. These errors correspond to attributes of the Backend being None or -1 (i.e. Backend was not successfully initialized).

Current output

Some of the threads return errors:

<Future at 0x1db68531fd0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db710ad850 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66b26650 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48a90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2f690 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66c9e6d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d10 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2d110 state=finished raised TypeError>
<Future at 0x1db66e2c3d0 state=finished raised ValueError>
<Future at 0x1db6b5097d0 state=finished raised ValueError>
<Future at 0x1db6accfd50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6ac94550 state=finished returned CompleteObservation_rte_case14_realistic>
<Future at 0x1db6d581b50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d4b9d0 state=finished returned CompleteObservation_unknown>

Expected output

All threads successfully return the initial observation:

<Future at 0x1db68531fd0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db710ad850 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66b26650 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48a90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2f690 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66c9e6d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d10 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2d110 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2c3d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6b5097d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6accfd50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6ac94550 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6d581b50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d4b9d0 state=finished returned CompleteObservation_unknown>

Temporary Fix

Put grid2op.make(...) inside a for loop with try-except (not recommended!):

N_ATTEMPTS = 10
for _ in range(N_ATTEMPTS):
  try:
      env = grid2op.make(dataset=self.env_name, backend=env_params.backend(), **grid2op_params)
      break
  except: # Resource is busy, wait
      time.sleep(1.0)
DEUCE1957 commented 2 months ago

The runner in Grid2Op clearly does work and supports parallel processing so I looked inside of it, it does not use Grid2op.make but instead initializes the environment using init_obj_from_kwargs(...):

Line 653 in runner.py:

res = self.envClass.init_obj_from_kwargs(
                n_busbar=self._n_busbar,
                other_env_kwargs=self.other_env_kwargs,
                init_env_path=self.init_env_path,
                init_grid_path=self.init_grid_path,
                chronics_handler=chronics_handler,
                backend=backend,
                parameters=parameters,
                name=self.name_env,
                names_chronics_to_backend=self.names_chronics_to_backend,
                actionClass=self.actionClass,
                observationClass=self.observationClass,
                rewardClass=self.rewardClass,
                legalActClass=self.legalActClass,
                voltagecontrolerClass=self.voltageControlerClass,
                other_rewards=self._other_rewards,
                opponent_space_type=self._opponent_space_type,
                opponent_action_class=self.opponent_action_class,
                opponent_class=self.opponent_class,
                opponent_init_budget=self.opponent_init_budget,
                opponent_budget_per_ts=self.opponent_budget_per_ts,
                opponent_budget_class=self.opponent_budget_class,
                opponent_attack_duration=self.opponent_attack_duration,
                opponent_attack_cooldown=self.opponent_attack_cooldown,
                kwargs_opponent=self.opponent_kwargs,
                with_forecast=self.with_forecast,
                attention_budget_cls=self._attention_budget_cls,
                kwargs_attention_budget=self._kwargs_attention_budget,
                has_attention_budget=self._has_attention_budget,
                logger=self.logger,
                kwargs_observation=self._kwargs_observation,
                observation_bk_class=self._observation_bk_class,
                observation_bk_kwargs=self._observation_bk_kwargs,
                _raw_backend_class=self.backendClass,
                _read_from_local_dir=self._read_from_local_dir,
            )
BDonnot commented 1 month ago

Hello,

First I think there are errors on the "expected output" that you wrote is not correct. You wrote:

<Future at 0x1db68531fd0 state=finished returned CompleteObservation_unknown> <Future at 0x1db710ad850 state=finished returned CompleteObservation_unknown>

The "unknown" at the end is definitely not normal. It should have been the name of the environment. If it works it should display :

<Future at 0x1db68531fd0 state=finished returned CompleteObservation_rte_case14_realistic> <Future at 0x1db710ad850 state=finished returned CompleteObservation_rte_case14_realistic>

Then, to adress more specifically the question, probably, if you do some things using thread, you should first init the grid (so that the classes are initialized, such that CompleteObservation_rte_case14_realistic is initialized once when you create the initial environment). And then you multiprocessing some of the things.

But be aware that "multi threading" in python does not work really well because of the GIL (global interpreter lock, feel free to google it I will not describe it here :-) ). But maybe the concurrent library handles that for you. If I were you I would prefer to use the multiprocessing library.

Basically, what is probably happening here is that grid2op tries to create the same class (for example Environment_rte_case14_realistic multiple times and concurrently (at the same time) which breaks some of python internal representation (I am not sure we should do that) and it is probably why sometimes you get errors and sometimes not.

In the runner, I use multiprocess library to perform asynchronous evaluation. I also use multiprocess library in the BaseMultiProcessEnvironment class. I never tested it with something else.

Finally, I use this way of doing things in the runner because you might have different environment class (eg MaskedEnvironment or TimeOutEnvironment) which cannot be created (yet) with a call to make. For now you have to do things like

import grid2op
from grid2op.Environment import MaskedEnvironment

env_name = "l2rpn_case14_sandbox"
lines_of_interest = np.array([True, True, True, True, True, True,
                                False, False, False, False, False, False,
                                False, False, False, False, False, False,
                                False, False])
env = MaskedEnvironment(grid2op.make(env_name),
                        lines_of_interest=lines_of_interest)

This way the "child" class of Environment (MaskedEnvironment in this case) can benefit from the Runner transparently because the init_obj_from_kwargs is overloaded properly :-)

Hope that helps

BDonnot commented 1 month ago

After looking at this, I managed to make it "work" with something like:

import grid2op
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor

def make_init_env():
    env = grid2op.make("l2rpn_case14_sandbox")
    _ = env.reset()
    return env

def make_env(env_init):
    env = env_init.copy()
    obs = env.reset()
    print(obs)
    return obs

if __name__ == "__main__":
    env_init = make_init_env()

    futures = set()
    with ThreadPoolExecutor(max_workers=mp.cpu_count()-1) as executor:
        for i in range(mp.cpu_count() - 1):
            futures.add(executor.submit(make_env, env_init.copy()))

        for item in futures:
            print(item.result())
DEUCE1957 commented 1 month ago

Thank you Benjamin, I did not realize threading in Python was fundamentally difficult to do because of the Global Interpreter Lock. I usually opt for the multiprocessing library as well (I think this issue is present there as well though, so your solution should also be needed there). However the concurrent library is what is used by Optuna - which is where the trouble began. Passing around an initialized environment is a bit tricky in my case since we have 3 environments (train, val, test) but I think we can make it work.

/ Xavier

BDonnot commented 1 month ago

If you don't do the env_init it does not really work (on my linux machine)

If you don't copy the environment when calling make_env:

futures.add(executor.submit(make_env, env_init)) # DON'T DO: DOES WORK

It does not work either.

I don't know if you can adapt your overall code to work like this.

Anyway, my recommendation would be : 1) optimize everything that can be in single core (I don't really know what to do, but there are certainly a lot of things you can optimize in grid2op, see for example https://grid2op.readthedocs.io/en/latest/data_pipeline.html or lightsim2grid backend (lightsim2grid.readthedocs.io/) 2) optimize everything outside of grid2op that can be, for example if you use RL make sure that the time you spent in your model is not higher than the time spent in grid2op (you can compare for that the time it takes to do say 1000 forward pass with grid2op on your model to the time it takes grid2op to do 1000 "random" actions). With some RL framework you can be surprised 3) only then try to do "asynchronous" things (and prefer in this case multiprocessing and not multi threading)

Finally do not hesitate to have a look at https://grid2op.readthedocs.io/en/master/troubleshoot.html#experimental-read-from-local-dir. It might work in your case

DEUCE1957 commented 1 month ago

While the minimal example with the ThreadpoolExecutor you shared does work on my Windows machine, unfortunately it does not seem I can adapt it to work in Optuna. I don't think there is a direct solution to be found since putting .copy() on attributes passed to the objective() function in Optuna still ends up running this locally in each thread (rather than outside in a for loop):

import optuna
import psutil
import joblib

def objective(trial, train_env, val_env, test_env):
    train_env = train_env.copy()
    val_env = val_env.copy()
    test_env = test_env.copy()
    return 0.0
...
safe_objective = lambda trial: objective(trial, train_env.copy(), val_env.copy(), test_env.copy())
n_cores = psutil.cpu_count(logical=False)
with joblib.parallel_config(backend="multiprocessing", n_jobs = n_cores):
    study = optuna.create_study()
    study.optimize(safe_objective, n_trials=100, n_jobs=n_cores)

The old hack does still work, but based on your comment I did some quick speed testing and it is actually not any faster to use the multiprocessing than run in one thread (since it takes about 1/10th the time per episode on 1 thread compared to 10 threads). This is in part because multiple threads cannot make use of the single GPU on my system. Given this, I may as well focus on optimizing the 1 thread (for example, with the MultiFolderWithCache from data_pipeline documentation you sent).

Nevertheless, in the case of hyperparameter optimization having parallel trials does still seem advantageous to me since then the hyperparameter space can be explored much more efficiently (assuming each trial is about as fast as a single trial by itself). May return to this again in the future.