aws / amazon-sagemaker-examples

Example 📓 Jupyter notebooks that demonstrate how to build, train, and deploy machine learning models using 🧠 Amazon SageMaker.
https://sagemaker-examples.readthedocs.io
Apache License 2.0
10.02k stars 6.75k forks source link

Sagemaker coach_launcher.py fails to save DDPG model (NO SUPPORT) #896

Closed rishanshah closed 3 years ago

rishanshah commented 4 years ago

I have been trying to train a DDPG model on a custom environment but I cannot get the model to save as there is some trouble with the coach_launcher.py. I get the following Traceback:

algo-1-1ynum_1  | ## agent: Finished evaluation phase. Success rate = 0.0, Avg Total Reward = 717.3
algo-1-1ynum_1  | Traceback (most recent call last):
algo-1-1ynum_1  |   File "train_Coach.py", line 29, in <module>
algo-1-1ynum_1  |     MyLauncher.train_main()
algo-1-1ynum_1  |   File "/opt/ml/code/sagemaker_rl/coach_launcher.py", line 255, in train_main
algo-1-1ynum_1  |     trainer._save_tf_model()
algo-1-1ynum_1  |   File "/opt/ml/code/sagemaker_rl/coach_launcher.py", line 195, in _save_tf_model
algo-1-1ynum_1  |     tf.train.init_from_checkpoint(ckpt_dir,
algo-1-1ynum_1  | NameError: name 'tf' is not defined

EDIT 11/10/2019

I imported tensorflow into a subclassed version of the SageMakerCoachPresetLauncher class in my train_coach.py training script however there were another myriad of problems that arose.

It turns out that the coach_launcher.py function _save_tf_model has been designed specifically to work with the examples (PPO AND DQN). There are a couple of comments in the code that customers are expected to subclass this class and refine functions where appropriate. The trouble is I have no idea how to define a function to save a DDPG model. I feel that I have narrowed the problem down to the assignment maps that are specific to the PPO model. There is no guidance on how to change this for DDPG. The function in question is defined below:

    def _save_tf_model(self):
        ckpt_dir = '/opt/ml/output/data/checkpoint'
        model_dir = '/opt/ml/model'

        # Re-Initialize from the checkpoint so that you will have the latest models up.
        tf.train.init_from_checkpoint(ckpt_dir,
                                      {'main_level/agent/online/network_0/': 'main_level/agent/online/network_0'})
        tf.train.init_from_checkpoint(ckpt_dir,
                                      {'main_level/agent/online/network_1/': 'main_level/agent/online/network_1'})

        # Create a new session with a new tf graph.
        sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True))
        sess.run(tf.global_variables_initializer())  # initialize the checkpoint.

        # This is the node that will accept the input.
#         print([n.name for n in tf.get_default_graph().as_graph_def().node])
        input_nodes = tf.get_default_graph().get_tensor_by_name('main_level/agent/main/online/' + \
                                                                'network_0/observation/observation:0')
        # This is the node that will produce the output.
        output_nodes = tf.get_default_graph().get_operation_by_name('main_level/agent/main/online/' + \
                                                                    'network_1/ppo_head_0/policy')
        # Save the model as a servable model.
        tf.saved_model.simple_save(session=sess,
                                   export_dir='model',
                                   inputs={"observation": input_nodes},
                                   outputs={"policy": output_nodes.outputs[0]})
        # Move to the appropriate folder. Don't mind the directory, this just works.
        # rl-cart-pole is the name of the model. Remember it.
        shutil.move('model/', model_dir + '/model/tf-model/00000001/')
        # EASE will pick it up and upload to the right path.
        print("Success")

How does one save and deploy a DDPG model??

ALSO

I tried changing framework to MXNET as there is also a _save_onnx_model function which I felt would work better than tensorflow. However even without saving, MXNET doesn't work when I try to train a DDPG algorithm.

I get the following error:

KeyError: "Unsupported head type: <class 'rl_coach.architectures.head_parameters.DDPGActorHeadParameters'>"

What is going on?

haohanchen-aws commented 4 years ago

@rishanshah Hey can you share your train_coach.py? I would suggest to import tensorflow as tf on top of the file rather than import it in a specific class.

rishanshah commented 4 years ago

@haohanchen-yagao here is my train_coach.py:

from sagemaker_rl.coach_launcher import SageMakerCoachPresetLauncher

class MyLauncher(SageMakerCoachPresetLauncher):

    def default_preset_name(self):
        """This points to a .py file that configures everything about the RL job.
        It can be overridden at runtime by specifying the RLCOACH_PRESET hyperparameter.
        """
        return 'presetsDDPG'

    def map_hyperparameter(self, name, value):
        """Here we configure some shortcut names for hyperparameters that we expect to use frequently.
        Essentially anything in the preset file can be overridden through a hyperparameter with a name
        like "rl.agent_params.algorithm.etc".
        """
        # maps from alias (key) to fully qualified coach parameter (value)
        mapping = {
                      "discount": "rl.agent_params.algorithm.discount",
                      "evaluation_episodes": "rl.evaluation_steps:EnvironmentEpisodes",
                      "improve_steps": "rl.improve_steps:TrainingSteps"
                  }
        if name in mapping:
            self.apply_hyperparameter(mapping[name], value)
        else:
            super().map_hyperparameter(name, value)

if __name__ == '__main__':
    MyLauncher.train_main()
haohanchen-aws commented 4 years ago

@rishanshah Thanks Do you mind sharing your source code and your model ckpt and metagraphdef with us so that we can know what the input and output tensors are?

rishanshah commented 4 years ago

@haohanchen-yagao, I am not sure whether I will be able to share the whole code with you but here are some things that will help.

Here is my ddpg presets and graph defintions:

from rl_coach.agents.ddpg_agent import DDPGAgentParameters
from rl_coach.base_parameters import VisualizationParameters
from rl_coach.core_types import TrainingSteps, EnvironmentEpisodes, EnvironmentSteps
from rl_coach.graph_managers.basic_rl_graph_manager import BasicRLGraphManager
from rl_coach.graph_managers.graph_manager import ScheduleParameters
from rl_coach.environments.gym_environment import GymVectorEnvironment
from rl_coach.architectures.layers import Dense
from rl_coach.base_parameters import EmbedderScheme
from rl_coach.filters.filter import InputFilter
from rl_coach.filters.reward.reward_rescale_filter import RewardRescaleFilter

####################
# Graph Scheduling #
####################
schedule_params = ScheduleParameters()
schedule_params.improve_steps = TrainingSteps(10)
schedule_params.steps_between_evaluation_periods = EnvironmentEpisodes(20)
schedule_params.evaluation_steps = EnvironmentEpisodes(1)
schedule_params.heatup_steps = EnvironmentSteps(1000)

#########
# Agent #
#########

agent_params = DDPGAgentParameters()
agent_params.network_wrappers['actor'].input_embedders_parameters['observation'].scheme = [Dense(400)]
agent_params.network_wrappers['actor'].middleware_parameters.scheme = [Dense(300)]
agent_params.network_wrappers['critic'].input_embedders_parameters['observation'].scheme = [Dense(400)]
agent_params.network_wrappers['critic'].middleware_parameters.scheme = [Dense(300)]
agent_params.network_wrappers['critic'].input_embedders_parameters['action'].scheme = EmbedderScheme.Empty
# agent_params = DDPGAgentParameters()
# agent_params.algorithm.num_consecutive_playing_steps = EnvironmentSteps(20)
# agent_params = DDPGAgentParameters()
# agent_params.network_wrappers['actor'].input_embedders_parameters['measurements'] = \
#     agent_params.network_wrappers['actor'].input_embedders_parameters.pop('observation')
# agent_params.network_wrappers['critic'].input_embedders_parameters['measurements'] = \
#     agent_params.network_wrappers['critic'].input_embedders_parameters.pop('observation')
# agent_params.network_wrappers['actor'].input_embedders_parameters['measurements'].scheme = [Dense(300)]
# agent_params.network_wrappers['actor'].middleware_parameters.scheme = [Dense(200)]
# agent_params.network_wrappers['critic'].input_embedders_parameters['measurements'].scheme = [Dense(400)]
# agent_params.network_wrappers['critic'].middleware_parameters.scheme = [Dense(300)]
# agent_params.network_wrappers['critic'].input_embedders_parameters['action'].scheme = EmbedderScheme.Empty
# agent_params.input_filter = InputFilter()
# agent_params.input_filter.add_reward_filter("rescale", RewardRescaleFilter(1/100.))
agent_params.algorithm.action_penalty = 10.5
# agent_params.algorithm.use_non_zero_discount_for_terminal_states  = True
# agent_params.algorithm.clip_critic_targets = (-10.,10.)

agent_params.network_wrappers['actor'].learning_rate = 0.01
agent_params.network_wrappers['critic'].learning_rate = 0.01
agent_params.algorithm.discount = 0.99

###############
# Environment #
###############
# env_params = GymVectorEnvironment(level='envV2Continous:ArrivalSim')
# env_params.additional_simulator_parameters = {'price': 30.0 }
env_params = GymVectorEnvironment(level='continousEnv:Environment')
graph_manager = BasicRLGraphManager(agent_params=agent_params, env_params=env_params,
                                    schedule_params=schedule_params, vis_params=VisualizationParameters()

The invocation:


estimator = RLEstimator(entry_point="train_Coach.py",
                        source_dir='.',
                        dependencies=["sagemaker_rl"],

                        toolkit=RLToolkit.COACH,
                        toolkit_version='0.11.0',
                        framework=RLFramework.TENSORFLOW,
                        role=role,
#                         train_instance_type="ml.m4.4xlarge",
                        train_instance_type='local',

                        train_instance_count=1,
                        output_path=s3_output_path,
                        base_job_name="ddpgtest",
                        hyperparameters = {
                          "RLCOACH_PRESET": "presetsDDPG",
#                           "discount": 0.9,
#                           "evaluation_episodes": 8,
#                           "improve_steps": 1000,
                          "save_model": 1
                        }
                    )

estimator.fit(wait=True)

With regards to the ckpt. I was using the sagemaker default bucket to test whether I can get the model to behave the way I wanted it. I am not sure how I can get the ckpt file from there.

This is the environment on which I am training the model continousEnv.py:

from scipy.stats import poisson, bernoulli
import math
import numpy as np
import gym
from gym.spaces import Discrete,Box
import sys

DAYS_OVERALL = 30
INITIAL_PRICE = 1000
ALLOTMENT = 20

def pr_of_booking(current_price):
    asymptote = 1
    slope = 0.01

    shift = INITIAL_PRICE - 100
    pr_booking = asymptote / (1 + math.exp(slope * (current_price - shift)))
    if 0 <= pr_booking <= 1:
        return pr_booking
    else:
        raise ValueError

def random_booking(current_price):
    target_avg_nr_bookings_per_day = ALLOTMENT/DAYS_OVERALL
    target_pr_at_pseudo_market_price = pr_of_booking(INITIAL_PRICE + 100)

    prospective_bookings = poisson.rvs(mu=target_avg_nr_bookings_per_day / target_pr_at_pseudo_market_price,
                                       size=1)[0]
    _sum = 0
    for i in range(prospective_bookings):
        prob = pr_of_booking(current_price=current_price)

        if bernoulli.rvs(prob, size=1)[0] == 1:
            _sum += 1
    return _sum

class Environment(gym.Env):
    """
    Observation:
        Type: Box(4)
        Num  Observation
         0   Deviation from the target curve
         1   Price
         2   Days before departure
         3   Ratio bookings/capacity

    Actions:
        Type: Discrete(5)
        Num  Action
         0   Price -= 100
         1   Price -= 50
         2   No change in price
         3   Price += 50
         4   Price += 100
    """
    def __init__(self):
        self.action_space = Box(-np.array([100]),np.array([100]),dtype=np.float32)
        high = np.array([1, np.inf, 1, DAYS_OVERALL])
        low = np.array([-1, 0, 0, 0])
        self.observation_space = Box(low, high, dtype=np.float32)
        self.state = None
        self.bookings = [np.NaN for _ in range(DAYS_OVERALL+1)]
        self.price_adjustments = [np.NaN for _ in range(DAYS_OVERALL+1)]
        self.price = [np.NaN for _ in range(DAYS_OVERALL+1)]

    def step(self, action):
        (deviation,
         price,
         ratio,
         dbd) = self.state

        today = int(DAYS_OVERALL - dbd)

#         price_adjustment = {0: -100,
#                             1: -50,
#                             2: 0,
#                             3: 50,
#                             4: 100}[action]
        price_adjustment = action

        self.price_adjustments[today] = action

        if dbd == DAYS_OVERALL:
            new_price = INITIAL_PRICE + price_adjustment

        else:
            new_price = self.price[today-1] + price_adjustment

        self.price[today] = new_price

        self.bookings[today] = random_booking(new_price)

        booked_so_far = sum(self.bookings[:(today + 1)])

        new_ratio = booked_so_far/ALLOTMENT

        target_ratio = today / DAYS_OVERALL
        new_deviation = new_ratio - target_ratio

        new_dbd = dbd - 1

        self.state = np.array([new_deviation,
                               new_price,
                               new_ratio,
                               new_dbd])

        reward = -10*np.log(abs(new_deviation)+sys.float_info.epsilon)
#         reward = -abs(new_deviation)
#         if ALLOTMENT <= booked_so_far:
#             reward -= (dbd * new_deviation) / 2

        done = (dbd == 0) or (ALLOTMENT <= booked_so_far)
        info = dict()

        print(action, self.state, reward, done, info)
        return self.state, reward, done, info

    def reset(self):
        deviation = 0
        price = INITIAL_PRICE
        ratio = 0
        dbd = DAYS_OVERALL
        self.state = np.array([deviation,
                               price,
                               ratio,
                               dbd])
        return self.state

    def render(self, mode=None):
        pass

so the input tensor should be 4 x batch and the output is 1 x batch I think

rishanshah commented 4 years ago

model.tar.gz output (1).tar.gz

@haohanchen-yagao I have figured out where all the data gets saved and I dont think the model is checkpointing either. I have included what I found in the files

ReHoss commented 4 years ago

Hi @rishanshah, did you find a solution ? I also want to try different RL methods ?

hongshanli23 commented 3 years ago

Dear SageMaker Community, In our attempts to provide this repository with a better level of support going forward, we’re closing issues that were opened prior to the v2 release of the SDK. This is because we believe that over time many of the issues posted were solved with the latest release or other recent changes to the repo. This will help us reallocate resources towards issues that are more likely to still be relevant today. Some of the issues experienced now can be resolved by referencing the v2 guide: https://sagemaker.readthedocs.io/en/stable/v2.html In this guide, you can find simple solutions to common notebook errors, like the renaming of parameters and classes. If you believe your issue is still ongoing and you have updated error messaging or other info, please re-open it, and we investigate the issue. Best Regards, AWS SageMaker Team

sichoi85 commented 3 years ago

model.tar.gz output (1).tar.gz

@haohanchen-yagao I have figured out where all the data gets saved and I dont think the model is checkpointing either. I have included what I found in the files

Could you please share what your solution is to this issue? I'm having the same exact issue.

rachelpoint commented 2 months ago

In case anyone else is still stuck on this, I solved this by switching from Coach to Ray, and subclassing and overwriting the sagemaker ray_launcher.py with this:

import os
import json
import time

from ray.tune.registry import register_env
import hivenv
import hivparameters as params
from sagemaker_rl.ray_launcher import SageMakerRayLauncher
from sagemaker_rl.configuration_list import ConfigurationList
import sagemaker_rl.tf_serving_utils as utils

ENV_NAME = "HIVEnv"
INTERMEDIATE_DIR = "/opt/ml/output/intermediate"
FINAL_MODEL_DIR = os.getenv("SM_MODEL_DIR", "/opt/ml/model")
METADATA_DIR = "/opt/ml/output/data"
TERMINATION_SIGNAL = "JOB_TERMINATED"
CHECKPOINT_DIR = "/opt/ml/input/data/checkpoint"
MODEL_OUTPUT_DIR = "/opt/ml/model"

env_config = {}

class SageMakerHIVLauncher(SageMakerRayLauncher):

    def register_env_creator(self):
        """Register the (custom) env to make it available to the ray nodes"""
        # Choose a name and register your custom environment
        register_env("HIVEnv-v0", lambda env_config: hivenv.HIVEnv(env_config))

    def get_experiment_config(self):
        return params.experiment_config

    def copy_checkpoints_to_model_output(self):
        checkpoints = []
        count = 0
        while not checkpoints:
            count += 1
            for root, directories, filenames in os.walk(INTERMEDIATE_DIR):
                for dir in directories:
                    if dir.startswith("checkpoint"):
                        checkpoints.append(os.path.join(root, dir))
            time.sleep(5)
            if count >= 6:
                raise RuntimeError("Failed to find checkpoint files")

        checkpoints.sort(key=utils.natural_keys)
        print(checkpoints[-1])

        # #####  Save has to be done here, while we're grabbing checkpoints
        from ray.rllib.algorithms.algorithm import Algorithm
        my_new_dqn = Algorithm.from_checkpoint(checkpoints[-1])
        utils.export_tf_serving(my_new_dqn, MODEL_OUTPUT_DIR)

    def create_tf_serving_model(self, algorithm=None, env_string=None):
        self.register_env_creator()
        # No-op otherwise -- in the new ray/tensorflow code, it has
        # to be done earlier in the process

    def customize_experiment_config(self, config):
        """Applies command-line hyperparameters to the config."""
        hyperparams_dict = json.loads(os.environ.get("SM_HPS", "{}"))
        hyperparams_dict["rl.training.local_dir"] = INTERMEDIATE_DIR

        self.hyperparameters = ConfigurationList()
        for name, value in hyperparams_dict.items():
            if name.startswith("rl."):
                self.hyperparameters.store(name, value)
        self.hyperparameters.apply_subset(config, "rl.")
        print(self.hyperparameters)
        return config

if __name__ == "__main__":
    SageMakerHIVLauncher().train_main()

Hope this helps!