hill-a / stable-baselines

A fork of OpenAI Baselines, implementations of reinforcement learning algorithms
http://stable-baselines.readthedocs.io/
MIT License
4.12k stars 724 forks source link

How to predict in multiprocess environment #334

Closed hn2 closed 5 years ago

hn2 commented 5 years ago

I am trying to use a trained model to predict on new data. The problem is that when I use multiple cpu's (12) I get a matrix of actions instead of one row: I get a row for every process spawned. Do I need to average the results by columns? Is this the correct approach?

    # multiprocess environment
    n_cpu = 12
    env = PortfolioEnv(history=history, abbreviation=instruments, steps=settings['steps'], window_length=settings['window_length'])
    env = SubprocVecEnv([lambda: env for i in range(n_cpu)])

    mdl = 'currencies_20050101_20180101_10000_3000_5_return'

    model = PPO2.load(mdl)

    # intialized here
    obs = env.reset()
    states = model.initial_state  # get the initial state vector for the reccurent network
    dones = np.zeros(states.shape[0])  # set all environment to not done

    action, _states = model.predict(obs, states, dones)

    weights = np.mean(action, axis=0)
hn2 commented 5 years ago

Can someone please answer?

hill-a commented 5 years ago

I believe your awnser is in the documentation: https://stable-baselines.readthedocs.io/en/master/guide/vec_envs.html

print(obs.shape) will display (n_cpu,) + obs.shape as you have 12 environments over 12 CPUs. Here is an alteration to return to single core when you are not using reccurent policies:

    # multiprocess environment
    n_cpu = 12
    env = PortfolioEnv(history=history, abbreviation=instruments, steps=settings['steps'], window_length=settings['window_length'])
    env = SubprocVecEnv([lambda: env for i in range(n_cpu)])

    mdl = 'currencies_20050101_20180101_10000_3000_5_return'

    model = PPO2.load(mdl)

    # close old env
    env.close()
    del env

    # make a new dummy vec env
    env = PortfolioEnv(history=history, abbreviation=instruments, steps=settings['steps'], window_length=settings['window_length'])
    env = DummyVecEnv([lambda: env])

    # intialized here
    obs = env.reset()
    states = model.initial_state  # get the initial state vector for the reccurent network
    dones = np.zeros(states.shape[0])  # set all environment to not done

    weights, _states = model.predict(obs, states, dones)

if you are using reccurent policies then you must train on the same number of environment as you are using in the prediction, it is a current limitation of stable baselines I believe.

hill-a commented 5 years ago

Also, make sure you have read how to use Reccurent policies https://stable-baselines.readthedocs.io/en/master/guide/examples.html#recurrent-policies

it's

weights, states = model.predict(obs, states, dones)

not

weights, _states = model.predict(obs, states, dones)

otherwise states wont update after each prediction

hn2 commented 5 years ago

Ok but that was my question:

"One current limitation of recurrent policies is that you must test them with the same number of environments they have been trained on."

So I have 12 environments in the train and test. I get 12 actions in the prediction. How do I use it? Take the average?

hill-a commented 5 years ago

No, the 12 actions correspond to the 12 observations you gave the model. This is equivalent to batch training. Please read the vectorized environment page: https://stable-baselines.readthedocs.io/en/master/guide/vec_envs.html

hn2 commented 5 years ago

You wrote above:

print(obs.shape) will display (n_cpu,) + obs.shape as you have 12 environments over 12 CPUs

I do not understand. My model has no 12 observations.

self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=((len(abbreviation) + 1) * window_length * history.shape[-1],), dtype=np.float32)

I have 7 abbreviations + 1 (for cash), windows length is 5 and history.shape[-1] is 5 (open, high, low, close, volume).

12 could only comes from number of environments or cpu's.

hill-a commented 5 years ago

print(obs.shape) will display (n_cpu,) + obs.shape as you have 12 environments over 12 CPUs.

This is because when you do:

env = SubprocVecEnv([lambda: env for i in range(n_cpu)])

you are creating n_cpu independent environment in parallel, all with different scopes. This means that env will return n_cpu times observations, as such you end up with (n_cpu,) + obs.shape as an output.

In reinforcement learning, often the environment is the slowest part, and it is the easiest part to parallelize. Hence why SubprocVecEnv acts only on the environment, and returns the observations from all of the environments.

In fact you can see this in the observation_space after the SubprocVecEnv

print(env.observation_space)  # (8 * 5 * 5)
env = SubprocVecEnv([lambda: env for i in range(n_cpu)])
print(env.observation_space)  # (12, 8 * 5 * 5)
hn2 commented 5 years ago

So what to do ??? I get: ValueError: could not broadcast input array from shape (12,8) into shape (8)

hill-a commented 5 years ago

if you are using reccurent policies then you must train on the same number of environment as you are using in the prediction, it is a current limitation of stable baselines I believe.

Well, if you want to use reccurent policies, then train without SubprocVecEnv. If you are not using reccurent policies, then simply use a single observation as input, and the model will output a single action as output.

So what to do ??? I get: ValueError: could not broadcast input array from shape (12,8) into shape (8)

I have no idea what code you just ran, how could I know what is wrong.

hn2 commented 5 years ago

Without multiprocess environment learning is very slow:

    n_cpu = 1 # 12
    env = PortfolioEnv(history=history, abbreviation=instruments, steps=settings['steps'], window_length=settings['window_length'], include_ta=settings['include_ta'],allow_short=settings['allow_short'], reward=settings['reward'])
    env = SubprocVecEnv([lambda: env for i in range(n_cpu)])

    class CustomPolicy(LstmPolicy):
        def __init__(self, *args, **kwargs):
            super(CustomPolicy, self).__init__(*args, **kwargs,
                                                net_arch=[1,
                                                         'lstm', 
                                                         dict(pi=[128, 128, 128, 128, 128, 128, 128, 128, 128, 128],
                                                              vf=[128, 128, 128, 128, 128, 128, 128, 128, 128, 128])],
                                                layer_norm=True, feature_extraction="mlp")                                                

    model = PPO2(CustomPolicy, env, nminibatches=1, verbose=0, tensorboard_log=settings['tensorboard_log'])
    model.learn(total_timesteps=settings['total_timesteps'])
hill-a commented 5 years ago

Without multiprocess environment learning is very slow

Yes, the point of multiprocessing is to speed up the learning.

If you want to still use multiprocessing, during prediction you can make observations with zeros except the first observation, then take the first action.

hn2 commented 5 years ago

One last question for this thread: training and predicting should always be done on the same architecture?

hill-a commented 5 years ago

One last question for this thread: training and predicting should always be done on the same architecture?

The save models should be computer invariante, and can switch from CPU to GPU or GPU to CPU.

hn2 commented 5 years ago

But it must be the same number of workers right?

hill-a commented 5 years ago

But it must be the same number of workers right?

correct for Reccurent models.

hn2 commented 5 years ago

Is ppo2 with one cpu worker equivalent to ppo1?

hill-a commented 5 years ago

already awnsered #365, no they are different.

araffin commented 5 years ago

For LSTM policies predict, related: https://github.com/hill-a/stable-baselines/issues/166#issuecomment-502350843