Open kafka399 opened 4 years ago
PipeModeDataset (as well as PipeMode) is only designed and available for training.
Could you help us better understand your use case to use it for inference?
I built a model in Pipe mode and utilized PipeModeDataset class. I'm able to train it and I have validation metrics and, the next logical step would be to run an inference. I deployed it as an endpoint and there was the error above, same with batch transform job. Please find below my script.
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
import boto3
import io
import argparse, os
from tensorflow.keras.utils import multi_gpu_model
from tensorflow.keras import backend as K
import json
from sagemaker_tensorflow import PipeModeDataset
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("tensorflow").setLevel(logging.INFO)
BUFFER_SIZE = 10000
hidden_size = 16
def load_dataset(channel, batch_size=64):
feature_description = {
'data': tf.io.FixedLenFeature([50*11], tf.float32),
'outcome': tf.io.FixedLenFeature([], tf.float32),
}
def _parse_function(example_proto):
# Parse the input `tf.Example` proto using the dictionary above.
parsed_features = tf.io.parse_single_example(example_proto, feature_description)
x = tf.reshape(parsed_features['data'],(50,11))
y = parsed_features['outcome']
return x,y
dataset = PipeModeDataset(channel=channel, record_format='TFRecord')
dataset = dataset.repeat(20)
dataset = dataset.prefetch(10)
dataset = dataset.map(_parse_function, num_parallel_calls=10)
dataset = dataset.batch(batch_size)
return dataset#.make_one_shot_iterator()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument('--learning-rate', type=float, default=0.01)
parser.add_argument('--batch-size', type=int, default=585)
parser.add_argument('--dropout', type=float, default=0.455)
parser.add_argument('--gpu-count', type=int, default=os.environ['SM_NUM_GPUS'])
parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--training', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
parser.add_argument('--validation', type=str, default=os.environ['SM_CHANNEL_VALIDATION'])
args, _ = parser.parse_known_args()
epochs = args.epochs
lr = args.learning_rate
batch_size = args.batch_size
gpu_count = args.gpu_count
model_dir = args.model_dir
training_dir = args.training
validation_dir = args.validation
dropout = args.dropout
features = 11
window_size = 50
train_data_single = load_dataset('training', batch_size)
val_data_single = load_dataset('validation', batch_size)
print(train_data_single)
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.LSTM(hidden_size, input_shape=(window_size, features), return_sequences=True))
model.add(tf.keras.layers.Dropout(rate = dropout))
model.add(tf.keras.layers.LSTM(hidden_size, return_sequences=False))
model.add(tf.keras.layers.Dropout(rate= dropout))
model.add(tf.keras.layers.Dense(1, kernel_initializer='normal'))
if gpu_count > 1:
model = multi_gpu_model(model, gpus=gpu_count)
print('going GPU')
model.compile(loss='mse', optimizer='adam',metrics=['mse', 'mae', 'mape']) # Using mse loss results in faster convergence
model.summary()
single_step_history = model.fit(train_data_single,
epochs=epochs,
steps_per_epoch=10,
validation_steps=2,
#verbose = 2
validation_data=val_data_single
)
print('fit part is done')
sess = K.get_session()
print('save is starting')
print(model.input)
print('saving the model.......')
tf.saved_model.simple_save(
tf.keras.backend.get_session(),
os.path.join(model_dir, 'model/1'),
inputs={'x': model.input},
outputs={t.name: t for t in model.outputs})
It's expected behavior since PipeModeDataset is designed to take advantage of pipemode feature of SageMaker Training. Pipemode is not available in either hosting or batch.
If you need to predict on large quantities of data at once I would recommend looking into using batch which doesn't require use of pipemode or PipeModeDataset class.
Do I read correctly - PipeModeDataset allows you to train a model, but it can't be hosted as an endpoint on Sagemaker, neither as Batch transform job, right? If yes, then I wonder what is the whole purpose of PipeModeDataset? Just get metrics by training on a dataset set which doesn't fit into memory (2GB)?
PipeModeDataset is not part of the model. Model trained using pipemode can be successfully hosted as an endpoint on Sagemaker, as Batch transform or outside of SageMaker.
With Pipe input mode, your dataset is streamed directly to your training instances instead of being downloaded first (i.e. when using File input mode). It's only defines how dataset is provided and doesn't affect the model.
If I can deploy my model as an end point, then there is something wrong with the code provided above. I can deploy same model trained in File mode with some adjustments, meanwhile it throws an error then trained in Pipe mode.
So back to my first post - why a model trained in Pipe mode fails and specifically, why it expects PipeModeDataset class be present on an endpoint: Not found: Op type not registered 'PipeModeDataset' in binary running on model.aws.local
What TF version do you use? Could you provide sample of how do you create an endpoint and run predictions?
Code to train a model:
tf_endpoint_name = 'tf-estimator-pipe-'+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
tf_estimator_pipe2 = TensorFlow(entry_point='keras_pipe.py',
role=role,
train_instance_count=1,
input_mode='Pipe',
train_instance_type='ml.p3.8xlarge',
framework_version='1.12',
py_version='py3',
script_mode=True,
train_use_spot_instances = True,
train_max_wait= 36000,
train_max_run =36000,
hyperparameters={
'dropout':0.22350414495308987,
'epochs': 1,
'batch-size': 10,
'learning-rate': 0.01}
)
tf_endpoint_name = 'tf-estimator-pipe-'+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
tf_estimator_pipe2.fit({'training': 's3://bucket/tfrecords/training/',
'validation': 's3://bucket/tfrecords/validation/'},
job_name=tf_endpoint_name)
Endpoint deployment:
tf_endpoint_name = 'keras-tf-pipe-'+time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
tf_predictor_pipe = tf_estimator_pipe2.deploy(initial_instance_count=1,
instance_type='ml.p3.8xlarge',
endpoint_name=tf_endpoint_name)
The problem is for some reason the PipeModeDataset operator is getting included in the graph saved by:
tf.saved_model.simple_save(
tf.keras.backend.get_session(),
os.path.join(model_dir, 'model/1'),
inputs={'x': model.input},
outputs={t.name: t for t in model.outputs})
(even though it's not a necessary node in the path from inputs
to outputs
). I've seen this issue too and am trying to figure how to restrict which parts of the graph get exported.
Edit: Update
Seems like you could use pretty much whatever method you like to strip the TensorFlow graph down to just the inference components... For now I'm just temporarily saving the model with Keras, clearing the TF session, then reloading from the Keras file and saving "properly" in TFServing format:
with tempfile.TemporaryDirectory() as tmpdir:
tmpfilename = os.path.join(tmpdir, "model.h5")
tf.keras.models.save_model(
inference_model,
tmpfilename,
overwrite=True,
include_optimizer=False,
)
K.clear_session()
inference_model = tf.keras.models.load_model(
tmpfilename,
# custom_objects={ ... } (If you need any)
)
sess = K.get_session()
tf.saved_model.simple_save(
sess,
os.path.join(model_dir, "model/1"),
inputs={ "inputs": inference_model.input },
outputs={ t.name: t for t in inference_model.outputs },
)
I'm sure there are more performant solutions - happy to hear of any!
Sagemaker endpoint fails to respond to the requests with the following error: