aws / sagemaker-python-sdk

A library for training and deploying machine learning models on Amazon SageMaker
https://sagemaker.readthedocs.io/
Apache License 2.0
2.1k stars 1.14k forks source link

loading pre-trained weights for keras model is not supported in distributed training #264

Closed WuyangLI closed 6 years ago

WuyangLI commented 6 years ago

System Information

Describe the problem

I created a distributed training job which trains a transfer learning model using VGG16. The job would succeed if I don't load pre-trained weights when creating VGG16 backbone model.

    backend = tf.keras.applications.vgg16.VGG16(weights=None, 
                                                include_top=False,
                                                input_shape=(224, 224, 3))

However, exception would throw when I try to load pre-trained weights as done in the following code snippet:

    backend = tf.keras.applications.vgg16.VGG16(weights='imagenet', 
                                                include_top=False,
                                                input_shape=(224, 224, 3))

Note that, for non-distributed training, loading pre-trained weights would not cause any exception.

Minimal repro / logs

InvalidArgumentError (see above for traceback): Cannot assign a device for operation 'block5_conv3/bias': Operation was explicitly assigned to /job:ps/task:0 but available devices are [ /job:localhost/replica:0/task:0/device:CPU:0 ]. Make sure the device specification refers to a valid device.
#011 [[Node: block5_conv3/bias = VariableV2[_class=["loc:@block5_conv3/bias"], container="", dtype=DT_FLOAT, shape=[512], shared_name="", _device="/job:ps/task:0"]()]]

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/container_support/training.py", line 36, in start
    fw.train()
  File "/usr/local/lib/python2.7/dist-packages/tf_container/train_entry_point.py", line 164, in train
    train_wrapper.train()
  File "/usr/local/lib/python2.7/dist-packages/tf_container/trainer.py", line 73, in train
    tf.estimator.train_and_evaluate(estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 439, in train_and_evaluate
    executor.run()
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 546, in run
    getattr(self, task_to_run)()
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 601, in run_master
    self._start_distributed_training(saving_listeners=saving_listeners)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 739, in _start_distributed_training
    saving_listeners=saving_listeners)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 363, in train
    loss = self._train_model(input_fn, hooks, saving_listeners)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 843, in _train_model
    return self._train_model_default(input_fn, hooks, saving_listeners)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 856, in _train_model_default
    features, labels, model_fn_lib.ModeKeys.TRAIN, self.config)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 831, in _call_model_fn
    model_fn_results = self._model_fn(features=features, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tf_container/trainer.py", line 108, in _model_fn
    return self.customer_script.model_fn(features, labels, mode, params)
  File "/opt/ml/code/keras_distributed_transfer_learning.py", line 14, in model_fn
    input_shape=(224, 224, 3))
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/applications/vgg16.py", line 225, in VGG16
    model.load_weights(weights_path)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/engine/network.py", line 1190, in load_weights
    saving.load_weights_from_hdf5_group(f, self.layers)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/engine/saving.py", line 719, in load_weights_from_hdf5_group
    K.batch_set_value(weight_value_tuples)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/backend.py", line 2707, in batch_set_value
    get_session().run(assign_ops, feed_dict=feed_dict)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/backend.py", line 442, in get_session
    _initialize_variables(session)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/backend.py", line 666, in _initialize_variables
    [variables_module.is_variable_initialized(v) for v in candidate_vars])
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 900, in run
    run_metadata_ptr)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 1135, in _run
    feed_dict_tensor, options, run_metadata)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 1316, in _do_run
    run_metadata)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 1335, in _do_call
    raise type(e)(node_def, op, message)

from sagemaker.tensorflow import TensorFlow from sagemaker.session import s3_input from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session() role = get_execution_role() training_steps = 100 evaluation_steps = 10

estimator = TensorFlow( entry_point='keras_distributed_transfer_learning.py', source_dir='./', role=role, training_steps=100, evaluation_steps=10, train_instance_count=2, train_instance_type='ml.p2.xlarge', input_mode='File')

input_dataset = s3_input('s3://xxxx/cats_and_dogs/') estimator.fit(input_dataset)

keras_distributed_transfer_learning.py

import tensorflow as tf from tensorflow.python.estimator.model_fn import ModeKeys as Modes

INPUT_TENSOR_NAME = "input_1" NUM_CLASSES = 2 BATCH_SIZE = 10

def model_fn(features, labels, mode, params): """The model_fn argument for creating an Estimator.""" backend = tf.keras.applications.vgg16.VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3)) x = backend.output x = tf.keras.layers.Flatten()(x) x = tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')(x) model = tf.keras.models.Model(inputs=backend.input, outputs=x) image = tf.keras.layers.Input(tensor=features[INPUT_TENSOR_NAME])

Define operations

if mode in (Modes.PREDICT, Modes.EVAL):
    logits = model(image, training=False)
    predicted_indices = tf.argmax(input=logits, axis=1)
    probabilities = tf.nn.softmax(logits, name='softmax_tensor')

if mode in (Modes.TRAIN):
    logits = model(image, training=True)
    global_step = tf.train.get_or_create_global_step()
    loss = tf.losses.softmax_cross_entropy(
        onehot_labels=labels, logits=logits)
    tf.summary.scalar('OptimizeLoss', loss)

if mode in (Modes.EVAL):
    logits = model(image, training=False)
    global_step = tf.train.get_or_create_global_step()
    loss = tf.losses.softmax_cross_entropy(
        onehot_labels=labels, logits=logits)
    tf.summary.scalar('OptimizeLoss', loss)

if mode == Modes.PREDICT:
    predictions = {
        'classes': predicted_indices,
        'probabilities': probabilities
    }
    export_outputs = {
        'predictions': tf.estimator.export.PredictOutput(predictions)
    }
    return tf.estimator.EstimatorSpec(
        mode, predictions=predictions, export_outputs=export_outputs)

if mode == Modes.TRAIN:
    optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
    train_op = optimizer.minimize(loss, global_step=global_step)
    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

if mode == Modes.EVAL:
    eval_metric_ops = {
        'accuracy': tf.metrics.accuracy(tf.argmax(labels, 1), predicted_indices)
    }
    return tf.estimator.EstimatorSpec(
        mode, loss=loss, eval_metric_ops=eval_metric_ops)

def _input_fn(training_dir, input_shape, batch_size): generator = tf.keras.preprocessing.image.ImageDataGenerator().flow_from_directory(training_dir, target_size=input_shape, batch_size=batch_size)

tensor_shapes = (tf.TensorShape([None, input_shape[0], input_shape[1], 3]), tf.TensorShape([None, NUM_CLASSES]))
tensor_types = (tf.float32, tf.float32)
dataset = tf.data.Dataset.from_generator(lambda: generator, tensor_types, tensor_shapes)
features, labels = dataset.make_one_shot_iterator().get_next()
return {INPUT_TENSOR_NAME: features}, labels

def train_input_fn(training_dir, hyperparameters): return _input_fn(training_dir + '/train/', (224, 224), BATCH_SIZE)

def eval_input_fn(training_dir, hyperparameters): return _input_fn(training_dir + '/test/', (224, 224), BATCH_SIZE)

def serving_input_fn(hyperparameters): inputs = {INPUT_TENSOR_NAME: tf.placeholder(tf.float32, [None, 224, 224, 3])} return tf.estimator.export.ServingInputReceiver(inputs, inputs)

nadiaya commented 6 years ago

A fix to SageMaker TensorFlow Container has been deployed early today.

Do you still experience the issue?

WuyangLI commented 6 years ago

@nadiaya , Hi, Nadiaya, unfortunately, I still got the same issue.

2018-07-03 10:02:28,193 INFO - tensorflow - Calling model_fn.
Downloading s3://sagemaker-us-east-1-424543879259/sagemaker-tensorflow-2018-07-03-09-55-41-612/source/sourcedir.tar.gz to /tmp/script.tar.gz
Found 24800 images belonging to 2 classes.
Downloading data from https://github.com/fchollet/deep-learning-models/releases/download/v0.1/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
#015   16384/58889256 [..............................] - ETA: 0s#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#015 6381568/58889256 [==>...........................] - ETA: 0s#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#01511771904/58889256 [====>.........................] - ETA: 0s#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#01516850944/58889256 [=======>......................] - ETA: 0s#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#01521831680/58889256 [==========>...................] - ETA: 0s#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#01526828800/58889256 [============>.................] - ETA: 0s#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#01531809536/58889256 [===============>..............] - ETA: 0s#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010
#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#010#01558900480/58889256 [==============================] - 1s 0us/step
2018-07-03 10:02:29.214459: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1435] Adding visible gpu devices: 0
2018-07-03 10:02:29.214538: I tensorflow/core/common_runtime/gpu/gpu_device.cc:923] Device interconnect StreamExecutor with strength 1 edge matrix:
2018-07-03 10:02:29.214554: I tensorflow/core/common_runtime/gpu/gpu_device.cc:929]      0 
2018-07-03 10:02:29.214563: I tensorflow/core/common_runtime/gpu/gpu_device.cc:942] 0:   N 
2018-07-03 10:02:29.214737: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1053] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 10763 MB memory) -> physical GPU (device: 0, name: Tesla K80, pci bus id: 0000:00:1e.0, compute capability: 3.7)
2018-07-03 10:02:29,253 ERROR - container_support.training - uncaught exception during training: Cannot assign a device for operation 'block5_conv3/bias': Operation was explicitly assigned to /job:ps/task:0 but available devices are [ /job:localhost/replica:0/task:0/device:CPU:0, /job:localhost/replica:0/task:0/device:GPU:0 ]. Make sure the device specification refers to a valid device.
#011 [[Node: block5_conv3/bias = VariableV2[_class=["loc:@block5_conv3/bias"], container="", dtype=DT_FLOAT, shape=[512], shared_name="", _device="/job:ps/task:0"]()]]

Caused by op u'block5_conv3/bias', defined at:
File "/usr/local/bin/entry.py", line 28, in <module>
modes[mode]()
File "/usr/local/lib/python2.7/dist-packages/container_support/training.py", line 36, in start
fw.train()
File "/usr/local/lib/python2.7/dist-packages/tf_container/train_entry_point.py", line 164, in train
train_wrapper.train()
File "/usr/local/lib/python2.7/dist-packages/tf_container/trainer.py", line 73, in train
tf.estimator.train_and_evaluate(estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 439, in train_and_evaluate
executor.run()
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 546, in run
getattr(self, task_to_run)()
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 601, in run_master
self._start_distributed_training(saving_listeners=saving_listeners)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/training.py", line 739, in _start_distributed_training
saving_listeners=saving_listeners)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 363, in train
loss = self._train_model(input_fn, hooks, saving_listeners)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 843, in _train_model
return self._train_model_default(input_fn, hooks, saving_listeners)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 856, in _train_model_default
features, labels, model_fn_lib.ModeKeys.TRAIN, self.config)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/estimator/estimator.py", line 831, in _call_model_fn
model_fn_results = self._model_fn(features=features, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tf_container/trainer.py", line 108, in _model_fn
return self.customer_script.model_fn(features, labels, mode, params)
File "/opt/ml/code/keras_distributed_transfer_learning.py", line 14, in model_fn
input_shape=(224, 224, 3))
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/applications/vgg16.py", line 187, in VGG16
x)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/keras/_impl/keras/engine/base_layer.py", line 314, in __call__
output = super(Layer, self).__call__(inputs, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/layers/base.py", line 699, in __call__
self.build(input_shapes)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/layers/convolutional.py", line 152, in build
dtype=self.dtype)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/layers/base.py", line 546, in add_variable
partitioner=partitioner)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/checkpointable.py", line 436, in _add_variable_with_custom_getter
**kwargs_for_getter)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 1317, in get_variable
constraint=constraint)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 1079, in get_variable
constraint=constraint)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 425, in get_variable
constraint=constraint)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 394, in _true_getter
use_resource=use_resource, constraint=constraint)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 786, in _get_single_variable
use_resource=use_resource)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 2220, in variable
use_resource=use_resource)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 2210, in <lambda>
previous_getter = lambda **kwargs: default_variable_creator(None, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variable_scope.py", line 2193, in default_variable_creator
constraint=constraint)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variables.py", line 235, in __init__
constraint=constraint)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variables.py", line 349, in _init_from_args
name=name)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/state_ops.py", line 137, in variable_op_v2
shared_name=shared_name)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/gen_state_ops.py", line 1255, in variable_v2
shared_name=shared_name, name=name)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
op_def=op_def)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 3392, in create_op
op_def=op_def)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 1718, in __init__
self._traceback = self._graph._extract_stack() # pylint: disable=protected-access

Logs about session config for master instance

2018-07-03 10:02:24,678 INFO - tensorflow - Using config: {'_save_checkpoints_secs': 300, '_session_config': None, '_keep_checkpoint_max': 5, '_tf_random_seed': None, '_task_type': u'master', '_train_distribute': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f99018162d0>, '_model_dir': u's3://sagemaker-us-east-1-424543879259/sagemaker-tensorflow-2018-07-03-09-55-41-612/checkpoints', '_num_worker_replicas': 2, '_task_id': 0, '_log_step_count_steps': 100, '_master': u'grpc://algo-1:2222', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_evaluation_master': '', '_service': None, '_global_id_in_cluster': 0, '_save_summary_steps': 100, '_num_ps_replicas': 2}
WuyangLI commented 6 years ago

@nadiaya , in addition to the error log, I found something that I hope could help you diagnose the problem. If I don't load the pre-trained weights, as done below:

backend = tf.keras.applications.vgg16.VGG16(weights=None, 
                                                include_top=False,
                                                input_shape=(224, 224, 3))

the distributed job would succeed, and in the log, I could find one line like the following:

2018-06-27 20:42:32.809077: I tensorflow/core/distributed_runtime/master_session.cc:1136] Start master session 9db04a30806d6f3b with config: allow_soft_placement: true

This line tells us that in the master session, if an operation is assigned to a device which is not available on the instance, it will be assigned to an available device.

However, when pretrained weights are loaded as done below:

backend = tf.keras.applications.vgg16.VGG16(weights='imagenet', 
                                                include_top=False,
                                                input_shape=(224, 224, 3))

In the log, I can't find anything about master session, which implies that the distributed training job failed before master seesion started.

iquintero commented 6 years ago

Hi @WuyangLI I was able to reproduce this issue and I'm working on it. Will post an update when I have a fix.

iquintero commented 6 years ago

Hi again @WuyangLI

After doing some research, it seems like this is a common problem with Keras and distributed training. The issue is that this block:

tf.keras.applications.vgg16.VGG16(weights='imagenet', 
                                                include_top=False,
                                                input_shape=(224, 224, 3))

internally creates its own tf.Session() object with the default parameters. The fix is to set a properly configured session for Keras Operations. This is how you can do it:

sm_tf_config = json.loads(os.environ['TF_CONFIG'])
    master = sm_tf_config['cluster']['master'][0]
    session = tf.Session('grpc://' + master)
    tf.keras.backend.set_session(session)
    backend = tf.keras.applications.vgg16.VGG16(weights='imagenet',
                                                include_top=False,
                                                input_shape=(224, 224, 3))
WuyangLI commented 6 years ago

@iquintero , Thank you so much for your fix, it worked like a charm. Before reaching out to your team, I tried to create a session and set the session I created to keras backend, but it didn't work out.

sess = tf.Session()
tf.keras.backend.set_session(sess)
backend = tf.keras.applications.vgg16.VGG16(weights='imagenet',
                                                include_top=False,
                                                input_shape=(224, 224, 3))

If I understand your code correctly, your code basically gets the master session and sets the tensorflow master session to keras. I would appreciate if you could explain to me why your code can work but mine can't.

Thank you :)

iquintero commented 6 years ago

Sure!

The difference is that by just calling tf.Session() the returned Session object is a local session, in this case we need a remote session. The difference in the fix I posted is that it is gathering the information from the Master Server from the TF Config that SageMaker creates. This is found in the TF_CONFIG environment variable.

You can find some information about it here: https://www.tensorflow.org/programmers_guide/graphs

especifically:

# Create a default in-process session.
with tf.Session() as sess:
  # ...

# Create a remote session.
with tf.Session("grpc://example.org:2222"):
  # ...
WuyangLI commented 6 years ago

@iquintero :) Thank you so much!