rlworkgroup / garage

A toolkit for reproducible reinforcement learning research.
MIT License
1.88k stars 310 forks source link

CEM and CMA don't support parallel sampling from TF policy #460

Closed amolchanov86 closed 5 years ago

amolchanov86 commented 5 years ago

Currently, both ES are under the original branch. I managed to run them with some minor modifications (added the session initialization) with a single sampler but running with multiple samplers causes problems (that completely undermines the advantages of ES). Are there any immediate plans to move both algorithms into the main TF branch?

ryanjulian commented 5 years ago

They are not in garage.tf because (to my knowledge) they do not depend on TensorFlow.

Can you post the error messages you got? These are both tested by the CI but perhaps some of the support code (e.g. sampler and/or plotter) have TF dependencies?

amolchanov86 commented 5 years ago

WHen one has to extract parameters from a policy one has to run a session. The absence of a default session is what it complained about initially (that is what I fixed by simply creating a session in the training routine). But it only works with a single sampler. When I am trying to increase n_parallel it keeps complaining about the lack of default session. I haven't dug into that yet hoping that maybe there was a plan to fix it. Here is the full post of the error I got now:

(garage) artem@artempc:~/prj/quad_metalearn/quad_dynalearn/quad_dynalearn$ ./train_garage_quad.py config/cem_quad.conf _results_temp/cem_quad_test --n_parallel 2
Reading parameter file config/cem_quad.conf ...
###############################################################
### PARAMETERS LOADED FROM CONFIG FILES (Later updated by arguments provided)
{'seed': 1, 'variant': {'env': 'QuadrotorEnv', 'alg_class': 'CEM', 'alg_param': {'max_path_length': 100, 'n_itr': 10, 'n_samples': 100, 'best_frac': 0.05, 'init_std': 0.1, 'plot': False, 'play_every_itr': 1, 'play_rollouts_num': 1}, 'policy_class': 'GaussianMLPPolicy', 'policy_param': {'hidden_sizes': [32, 32]}}}
+++++++++++++++++++++++++++++++++++++++++++++++++++
PARAMETERS TUPLE:  None ()  SEED:  1
python /home/artem/prj/drl/garage/scripts/run_experiment.py  --seed '1'  --n_parallel '2'  --snapshot_mode 'last'  --plot 'False'  --exp_name 'experiment_2019_01_09_14_39_27_0001'  --log_dir '_results_temp/cem_quad_test/seed_001/'  --use_cloudpickle 'True'  --args_data 'gASV3wUAAAAAAACMF2Nsb3VkcGlja2xlLmNsb3VkcGlja2xllIwOX2ZpbGxfZnVuY3Rpb26Uk5QoaACMD19tYWtlX3NrZWxfZnVuY5STlGgAjA1fYnVpbHRpbl90eXBllJOUjAhDb2RlVHlwZZSFlFKUKEsBSwBLC0sGS0NCbAEAAGQBZAJsAG0BfQEBAGQBZANsAm0DfQIBAGQBZARsBG0FfQMBAGQBZAVsBm0HfQQBAGQBZAZsCG0JfQUBAGQBZAdsCm0LfQYBAHwAZAgZAGQJawJyZnwCdAxkCmQLZAyNAoMBfQduFnwCdA10DmoPfABkCBkAgwGDAYMBfQd8AGQIPQB0EIMAfABkDRkAGQBmAGQOfAdqEWkBfABkDxkAlwKOAX0IfABkDT0AfABkDz0AfABkEBkAZBFrA5ABchJ0EIMAfABkEhkAGQBmAGQOfAdqEWkBfABkExkAlwKOAX0JfABkEj0AfABkEz0AdBCDAHwAZBAZABkAZgB8B3wIfAlkFJwDfABkFRkAlwKOAX0KbiJ0EIMAfABkEBkAGQBmAHwHfAhkFpwCfABkFRkAlwKOAX0KfABkED0AfABkFT0AfABkFz0AfABpAGsCkAFzYHQSZBh0E3wAgwEWAIMBggF8CmoUgwABAGQZUwCUKIxWCiAgICBXcmFwIFBQTyB0cmFpbmluZyB0YXNrIGluIHRoZSBydW5fdGFzayBmdW5jdGlvbi4KCiAgICA6cGFyYW0gXzoKICAgIDpyZXR1cm46CiAgICCUSwCME0dhdXNzaWFuTUxQQmFzZWxpbmWUhZSMBVRmRW52lIWUjBFHYXVzc2lhbk1MUFBvbGljeZSFlIwDQ0VNlIWUjANQUE+UhZSMBFRSUE+UhZSMA2VudpSMDFF1YWRyb3RvckVudpSIiYwLcmF3X2NvbnRyb2yUjAp0Zl9jb250cm9slIaUjAxwb2xpY3lfY2xhc3OUjAhlbnZfc3BlY5SMDHBvbGljeV9wYXJhbZSMCWFsZ19jbGFzc5RoEowOYmFzZWxpbmVfY2xhc3OUjA5iYXNlbGluZV9wYXJhbZRoGIwGcG9saWN5lIwIYmFzZWxpbmWUh5SMCWFsZ19wYXJhbZRoGGgjhpSMCGV4cF9uYW1llIwxRVJST1I6IFNvbWUgb2YgcGFyYW1ldGVyIHZhbHVlcyB3ZXJlIG5vdCB1c2VkOiAlc5ROdJQojBNnYXJhZ2UudGYuYmFzZWxpbmVzlGgMjA5nYXJhZ2UudGYuZW52c5RoDowSZ2FyYWdlLnRmLnBvbGljaWVzlGgQjBhxdWFkX2R5bmFsZWFybi5hbGdvcy5jZW2UaBKME2dhcmFnZS50Zi5hbGdvcy5wcG+UaBSMFGdhcmFnZS50Zi5hbGdvcy50cnBvlGgWaBmMCW5vcm1hbGl6ZZSMA2d5bZSMBG1ha2WUjAZsb2NhbHOUjARzcGVjlIwOQXNzZXJ0aW9uRXJyb3KUjANzdHKUjAV0cmFpbpR0lCiMCnRhc2tfcGFyYW2UaAxoDmgQaBJoFGgWaBhoI2gkjARhbGdvlHSUjBYuL3RyYWluX2dhcmFnZV9xdWFkLnB5lIwIcnVuX3Rhc2uUS0tDPgAHDAEMAQwCDAEMAQwCDAESAhYBBgIiAQYBBgIOASIBBgEGAg4BAgECAQYBDgIOAQIBBgEMAgYBBgUGARoClCkpdJRSlEr/////jAhfX21haW5fX5SHlFKUfZQojAdnbG9iYWxzlH2UKGgZjCNneW1fYXJ0LnF1YWRyb3Rvci5xdWFkcm90b3JfbW9kdWxhcpRoGZOUaDGMHGd5bV9hcnQucXVhZHJvdG9yLnF1YWRfdXRpbHOUaDGTlGgyaACMCXN1YmltcG9ydJSTlGgyhZRSlHWMCGRlZmF1bHRzlE6MBGRpY3SUfZSMDmNsb3N1cmVfdmFsdWVzlE6MBm1vZHVsZZRoQowEbmFtZZRoPowDZG9jlGgLjAhxdWFsbmFtZZRoPnV0Ui4='  --variant_data 'gAN9cQAoWAMAAABlbnZxAVgMAAAAUXVhZHJvdG9yRW52cQJYCQAAAGFsZ19jbGFzc3EDWAMAAABDRU1xBFgJAAAAYWxnX3BhcmFtcQV9cQYoWA8AAABtYXhfcGF0aF9sZW5ndGhxB0tkWAUAAABuX2l0cnEISwpYCQAAAG5fc2FtcGxlc3EJS2RYCQAAAGJlc3RfZnJhY3EKRz+pmZmZmZmaWAgAAABpbml0X3N0ZHELRz+5mZmZmZmaWAQAAABwbG90cQyJWA4AAABwbGF5X2V2ZXJ5X2l0cnENSwFYEQAAAHBsYXlfcm9sbG91dHNfbnVtcQ5LAXVYDAAAAHBvbGljeV9jbGFzc3EPWBEAAABHYXVzc2lhbk1MUFBvbGljeXEQWAwAAABwb2xpY3lfcGFyYW1xEX1xElgMAAAAaGlkZGVuX3NpemVzcRNdcRQoSyBLIGVzWAgAAABleHBfbmFtZXEVWCMAAABleHBlcmltZW50XzIwMTlfMDFfMDlfMTRfMzlfMjdfMDAwMXEWdS4='
2019-01-09 14:39:31 | Setting seed to 1
2019-01-09 14:39:31 | Setting seed to 2
2019-01-09 14:39:31 | tensorboard data will be logged into:_results_temp/cem_quad_test/seed_001/
WARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.
WARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.
2019-01-09 14:39:36.348996: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2019-01-09 14:39:36 | [experiment_2019_01_09_14_39_27_0001] Populating workers...
WARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.
WARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.
WARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.
WARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.
*** Error in `python': double free or corruption (fasttop)multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/artem/prj/drl/garage/garage/sampler/stateful_pool.py", line 186, in _worker_run_each
    return runner(singleton_pool.G, *args)
  File "/home/artem/prj/drl/garage/garage/sampler/parallel_sampler.py", line 40, in _worker_populate_task
    g.policy = pickle.loads(policy)
  File "/home/artem/prj/drl/garage/garage/tf/core/parameterized.py", line 107, in __setstate__
    tf.get_default_session().run(
AttributeError: 'NoneType' object has no attribute 'run'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda2/envs/garage/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/opt/anaconda2/envs/garage/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "/home/artem/prj/drl/garage/garage/sampler/stateful_pool.py", line 188, in _worker_run_each
    raise Exception("".join(traceback.format_exception(*sys.exc_info())))
Exception: Traceback (most recent call last):
  File "/home/artem/prj/drl/garage/garage/sampler/stateful_pool.py", line 186, in _worker_run_each
    return runner(singleton_pool.G, *args)
  File "/home/artem/prj/drl/garage/garage/sampler/parallel_sampler.py", line 40, in _worker_populate_task
    g.policy = pickle.loads(policy)
  File "/home/artem/prj/drl/garage/garage/tf/core/parameterized.py", line 107, in __setstate__
    tf.get_default_session().run(
AttributeError: 'NoneType' object has no attribute 'run'

"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/artem/prj/drl/garage/scripts/run_experiment.py", line 242, in <module>
    run_experiment(sys.argv)
  File "/home/artem/prj/drl/garage/scripts/run_experiment.py", line 185, in run_experiment
    method_call(variant_data)
  File "./train_garage_quad.py", line 125, in run_task
    algo.train()
  File "/home/artem/prj/quad_metalearn/quad_dynalearn/quad_dynalearn/algos/cem.py", line 137, in train
    parallel_sampler.populate_task(self.env, self.policy)
  File "/home/artem/prj/drl/garage/garage/sampler/parallel_sampler.py", line 58, in populate_task
    ] * singleton_pool.n_parallel)
  File "/home/artem/prj/drl/garage/garage/sampler/stateful_pool.py", line 72, in run_each
    return results.get()
  File "/opt/anaconda2/envs/garage/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: Traceback (most recent call last):
  File "/home/artem/prj/drl/garage/garage/sampler/stateful_pool.py", line 186, in _worker_run_each
    return runner(singleton_pool.G, *args)
  File "/home/artem/prj/drl/garage/garage/sampler/parallel_sampler.py", line 40, in _worker_populate_task
    g.policy = pickle.loads(policy)
  File "/home/artem/prj/drl/garage/garage/tf/core/parameterized.py", line 107, in __setstate__
    tf.get_default_session().run(
AttributeError: 'NoneType' object has no attribute 'run'
ryanjulian commented 5 years ago

@naeioi thoughts?

naeioi commented 5 years ago

@amolchanov86 Thanks for reporting. CEM itself depends on neither tf or theano. I notice that you were using GaussianMLPPolicy which is a tensorflow policy, that's why you need to initialize a tensorflow session manually for the policy to use. @ryanjulian CI does not break because it only tests CEM under theano policy.

@amolchanov86 Can you also post your train_garage_quad.py? The magic run_experiment setups several processes and instruct them to run the callable method_call passed to it. I suspect that your tf session was not initialized within method_call.

amolchanov86 commented 5 years ago

I see, so what you are trying to say is that making a session in the training routine itself is too late and it should be called earlier?

Here my full train_garage_quad.py

#!/usr/bin/env python
"""
This is a parametrized script to run TRPO/PPO 
with a custom env
"""
import argparse
import sys
import os
import datetime, time
import itertools
import os.path as osp
import uuid
import copy

import numpy as np

import dateutil.tz
import yaml

import gym

from garage.envs import normalize
from garage.experiment import run_experiment

# Custom stuff
import quad_dynalearn.config.config_loader as conf
import quad_dynalearn.misc.variants_utils as vu

########################################################################
## ARGUMENTS
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("config_file", help='yaml file with default settings of parameters')
parser.add_argument("log_dir", default='_results_temp/trpo_ppo_last', help='Directory to log into')
parser.add_argument("--seed", '-s', default=None, help='list of seeds to use separated by comma (or a single seed w/o comma). If None seeds from config_file will be used')
parser.add_argument("--n_parallel", '-n', type=int, default=1, help='Number of parallel workers to run a single task')
parser.add_argument("--snapshot_mode", '-snm', default='last', help='Snapshot mode. Opt: last')
parser.add_argument("--plot", '-plt', action="store_true", help='Plotting')
parser.add_argument("--param_name", '-p', help='task hyperparameter names separated by comma')
parser.add_argument("--param_val", '-pv', help='task hyperparam values.'+ 
                    ' For a single par separated by comma.' +
                    ' For adjacent params separated by double comma.' +
                    '   Ex: \"-p par1,par2 -pv pv11,pv12,,pv21,pv22\"' + 
                    '   where pv11,pv12 - par values for par1 , pv21,pv22 - par values for par2')
args = parser.parse_args()

########################################################################
## PARAMETERS (non grid)
# Loading parameters not specified in the arguments
print('Reading parameter file %s ...' % args.config_file)
params = conf.trpo_ppo_default_params()
yaml_stream = open(args.config_file, 'r')
params_new = yaml.load(yaml_stream)
params.update(params_new)
print('###############################################################')
print('### PARAMETERS LOADED FROM CONFIG FILES (Later updated by arguments provided)')
print(params)

## Get a grid of task variations and put it into list as parameter dictionaries
## WARN: when you add more parameters to add_arguments you will have to modify grid_of_variants()
variants_list = vu.grid_of_variants(args, params)

## Saving command line executing the script
cmd = " ".join(sys.argv)
if not os.path.isdir(args.log_dir):
    os.makedirs(args.log_dir)
with open(args.log_dir + os.sep + "cmd.sh", "w") as cmdfile:
    cmdfile.write("#!/usr/bin/bash\n")
    cmdfile.write(cmd)

def run_task(task_param):
    """
    Wrap PPO training task in the run_task function.

    :param _:
    :return:
    """
    from garage.tf.baselines import GaussianMLPBaseline
    from garage.tf.envs import TfEnv
    from garage.tf.policies import GaussianMLPPolicy, DeterministicMLPPolicy
    from garage.tf.algos.trpo import TRPO

    from quad_dynalearn.algos.cem import CEM
    from quad_dynalearn.algos.cma_es import CMAES
    from quad_dynalearn.algos.ppo import PPO

    if task_param["env"] == "QuadrotorEnv":
        # from gym_art.quadrotor.quadrotor_control import *
        from gym_art.quadrotor.quadrotor_modular import QuadrotorEnv
        env = TfEnv(QuadrotorEnv(**task_param["env_param"]))
        del task_param["env_param"]
    else:
        env = TfEnv(normalize(gym.make(task_param["env"])))
    del task_param["env"]

    policy = locals()[task_param["policy_class"]](env_spec=env.spec, **task_param["policy_param"])
    del task_param["policy_class"]
    del task_param["policy_param"]

    if task_param["alg_class"] != "CEM" and task_param["alg_class"] != "CMAES":
        baseline = locals()[task_param["baseline_class"]](env_spec=env.spec, **task_param["baseline_param"])
        del task_param["baseline_class"]
        del task_param["baseline_param"]

        algo = locals()[task_param["alg_class"]](
            env=env,
            policy=policy,
            baseline=baseline,
            **task_param["alg_param"])
    else:
        algo = locals()[task_param["alg_class"]](
            env=env,
            policy=policy,
            **task_param["alg_param"])

    del task_param["alg_class"]
    del task_param["alg_param"]

    # Check that we used all parameters:
    # It helps revealing situations where you thought you set certain parameter
    # But in fact made spelling mistake and it failed
    del task_param["exp_name"] #This is probably generated by garage
    assert task_param == {}, "ERROR: Some of parameter values were not used: %s" % str(task_param)

    algo.train()

start_time = time.time()
for var in variants_list:
    ## Dumping config
    with open(var["log_dir"] + os.sep + "config.yml", 'w') as yaml_file:
        yaml_file.write(yaml.dump(var, default_flow_style=False))

    ## Running
    run_experiment(
        run_task,
        **var
    )

end_time = time.time()
print("##################################################")
print("Total Runtime: ", end_time - start_time)
naeioi commented 5 years ago

Yes. I assume the training routine you are referring to is the implementation of CEM, where there's nothing related to tf session. So you have to do it outside, earlier before algo.train() is called.

I made some change to your script. There are some deps I don't have so I cannot test but it should work. Could you give it a try? https://gist.github.com/naeioi/28bb467abf6a598e814bdb136f7c2230

naeioi commented 5 years ago

@amolchanov86 Would like to know if you have solved your problem?

amolchanov86 commented 5 years ago

Hi, sorry, I have switched to other things and just missed your previous post. I will try today and report. Thanks a lot for the help!

amolchanov86 commented 5 years ago

I tried your example and I had the same result (i.e. works with one worker, fails if n_parallel > 1). I even substituted the cem version I had with the original one. Hence I think if the bug is eliminated on the original version then my code should also work. Or maybe I am doing something wrong then, please, share a script that successfully runs CEM/CMAES with n_parallel > 1 and TF policies. Thanks a lot for the support!

naeioi commented 5 years ago

@amolchanov86 After some debugging, I found that both CEM and CMAES work only under theano policy when doing parallel sampling.

The root cause is that tf session is not fork-safe, meaning that one tf session cannot be shared by multiple processes. Algos under garage.tf uses tf-specific samplers to manager isolated session in each worker, and since CEM and CMAES don't use those samplers, they cannot work under tf policy.

@ryanjulian @CatherineSue Do we need to add tf support for CEM and CMAES in this case?

CatherineSue commented 5 years ago

@amolchanov86 @naeioi Thanks for pointing out the problem. I think there are two solutions:

naeioi commented 5 years ago

uhh, these two solutions both require some effort. I think the first step we should take is to move those two algorithms under garage.theano.

CatherineSue commented 5 years ago

We are going to remove theano module soon. We also need to make sure the tf algo parity with theano. So move them to theano is equivalent to option one. And the algorithms don't actually depend on theano. The root reason is that we don't have a policy ready for the algirithm.

I prefer to leave them here then add tf support or no dep policy later.

ryanjulian commented 5 years ago

I agree, let's just keep the issue open.