aws / sagemaker-python-sdk

A library for training and deploying machine learning models on Amazon SageMaker
https://sagemaker.readthedocs.io/
Apache License 2.0
2.11k stars 1.14k forks source link

UnexpectedStatusException: Error for Training job. Reason: AlgorithmError: ExecuteUserScriptError: #2688

Closed Electric-Dragon closed 11 months ago

Electric-Dragon commented 3 years ago

Describe the bug This training script runs successfully on a small dataset (~11MB) but fails on a large one (~5.4GB).

To reproduce

Training script:

import numpy as np
import typing
from typing import Any, Tuple
import tensorflow as tf
import tensorflow_text as tf_text
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras import mixed_precision
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from pathlib import Path
import argparse
import json
import os
use_builtins = True
import subprocess
import sys

# mixed_precision.set_global_policy('mixed_float16')
# os.environ['TF_ENABLE_AUTO_MIXED_PRECISION'] = '1'

gpus = tf.config.list_physical_devices('GPU')
if gpus:
  try:
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    print(e)

def install(package):
    subprocess.check_call([sys.executable, "-q", "-m", "pip", "install", package])

class ShapeChecker():
  def __init__(self):
    self.shapes = {}

  def __call__(self, tensor, names, broadcast=False):
    if not tf.executing_eagerly():
      return

    if isinstance(names, str):
      names = (names,)

    shape = tf.shape(tensor)
    rank = tf.rank(tensor)

    if rank != len(names):
      raise ValueError(f'Rank mismatch:\n'
                       f'    found {rank}: {shape.numpy()}\n'
                       f'    expected {len(names)}: {names}\n')

    for i, name in enumerate(names):
      if isinstance(name, int):
        old_dim = name
      else:
        old_dim = self.shapes.get(name, None)
      new_dim = shape[i]

      if (broadcast and new_dim == 1):
        continue

      if old_dim is None:
        # If the axis name is new, add its length to the cache.
        self.shapes[name] = new_dim
        continue

      if new_dim != old_dim:
        raise ValueError(f"Shape mismatch for dimension: '{name}'\n"
                         f"    found: {new_dim}\n"
                         f"    expected: {old_dim}\n")

def _parse_args():
    parser = argparse.ArgumentParser()

    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument("--model_dir", type=str)
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAINING"))
    parser.add_argument("--hosts", type=list, default=json.loads(os.environ.get("SM_HOSTS")))
    parser.add_argument("--current-host", type=str, default=os.environ.get("SM_CURRENT_HOST"))

    return parser.parse_known_args()

def load_data(path):

  path = os.path.join(path, "data.txt")

  text = Path(path).read_text(encoding='utf-8')

  lines = text.splitlines()
  pairs = [line.split('\t') for line in lines]

  inp = [inp for targ, inp in pairs]
  targ = [targ for targ, inp in pairs]

  return targ, inp

def tf_lower_and_split_punct(text):
  text = tf_text.normalize_utf8(text, 'NFKD')
  text = tf.strings.lower(text)
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

embedding_dim = 256
units = 1024

class Encoder(tf.keras.layers.Layer):
  def __init__(self, input_vocab_size, embedding_dim, enc_units):
    super(Encoder, self).__init__()
    self.enc_units = enc_units
    self.input_vocab_size = input_vocab_size

    self.embedding = tf.keras.layers.Embedding(self.input_vocab_size,
                                               embedding_dim)

    self.gru = tf.keras.layers.GRU(self.enc_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')

  def call(self, tokens, state=None):
    shape_checker = ShapeChecker()
    shape_checker(tokens, ('batch', 's'))

    vectors = self.embedding(tokens)
    shape_checker(vectors, ('batch', 's', 'embed_dim'))

    output, state = self.gru(vectors, initial_state=state)
    shape_checker(output, ('batch', 's', 'enc_units'))
    shape_checker(state, ('batch', 'enc_units'))

    return output, state

class BahdanauAttention(tf.keras.layers.Layer):
  def __init__(self, units):
    super().__init__()
    self.W1 = tf.keras.layers.Dense(units, use_bias=False)
    self.W2 = tf.keras.layers.Dense(units, use_bias=False)

    self.attention = tf.keras.layers.AdditiveAttention()

  def call(self, query, value, mask):
    shape_checker = ShapeChecker()
    shape_checker(query, ('batch', 't', 'query_units'))
    shape_checker(value, ('batch', 's', 'value_units'))
    shape_checker(mask, ('batch', 's'))

    # From Eqn. (4), `W1@ht`.
    w1_query = self.W1(query)
    shape_checker(w1_query, ('batch', 't', 'attn_units'))

    # From Eqn. (4), `W2@hs`.
    w2_key = self.W2(value)
    shape_checker(w2_key, ('batch', 's', 'attn_units'))

    query_mask = tf.ones(tf.shape(query)[:-1], dtype=bool)
    value_mask = mask

    context_vector, attention_weights = self.attention(
        inputs = [w1_query, value, w2_key],
        mask=[query_mask, value_mask],
        return_attention_scores = True,
    )
    shape_checker(context_vector, ('batch', 't', 'value_units'))
    shape_checker(attention_weights, ('batch', 't', 's'))

    return context_vector, attention_weights

class Decoder(tf.keras.layers.Layer):
  def __init__(self, output_vocab_size, embedding_dim, dec_units):
    super(Decoder, self).__init__()
    self.dec_units = dec_units
    self.output_vocab_size = output_vocab_size
    self.embedding_dim = embedding_dim

    # For Step 1. The embedding layer convets token IDs to vectors
    self.embedding = tf.keras.layers.Embedding(self.output_vocab_size,
                                               embedding_dim)

    # For Step 2. The RNN keeps track of what's been generated so far.
    self.gru = tf.keras.layers.GRU(self.dec_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')

    # For step 3. The RNN output will be the query for the attention layer.
    self.attention = BahdanauAttention(self.dec_units)

    # For step 4. Eqn. (3): converting `ct` to `at`
    self.Wc = tf.keras.layers.Dense(dec_units, activation=tf.math.tanh,
                                    use_bias=False)

    # For step 5. This fully connected layer produces the logits for each
    # output token.
    self.fc = tf.keras.layers.Dense(self.output_vocab_size)

class DecoderInput(typing.NamedTuple):
  new_tokens: Any
  enc_output: Any
  mask: Any

class DecoderOutput(typing.NamedTuple):
  logits: Any
  attention_weights: Any

def call(self,
         inputs: DecoderInput,
         state=None) -> Tuple[DecoderOutput, tf.Tensor]:
  shape_checker = ShapeChecker()
  shape_checker(inputs.new_tokens, ('batch', 't'))
  shape_checker(inputs.enc_output, ('batch', 's', 'enc_units'))
  shape_checker(inputs.mask, ('batch', 's'))

  if state is not None:
    shape_checker(state, ('batch', 'dec_units'))

  # Step 1. Lookup the embeddings
  vectors = self.embedding(inputs.new_tokens)
  shape_checker(vectors, ('batch', 't', 'embedding_dim'))

  # Step 2. Process one step with the RNN
  rnn_output, state = self.gru(vectors, initial_state=state)

  shape_checker(rnn_output, ('batch', 't', 'dec_units'))
  shape_checker(state, ('batch', 'dec_units'))

  # Step 3. Use the RNN output as the query for the attention over the
  # encoder output.
  context_vector, attention_weights = self.attention(
      query=rnn_output, value=inputs.enc_output, mask=inputs.mask)
  shape_checker(context_vector, ('batch', 't', 'dec_units'))
  shape_checker(attention_weights, ('batch', 't', 's'))

  # Step 4. Eqn. (3): Join the context_vector and rnn_output
  #     [ct; ht] shape: (batch t, value_units + query_units)
  context_and_rnn_output = tf.concat([context_vector, rnn_output], axis=-1)

  # Step 4. Eqn. (3): `at = tanh(Wc@[ct; ht])`
  attention_vector = self.Wc(context_and_rnn_output)
  shape_checker(attention_vector, ('batch', 't', 'dec_units'))

  # Step 5. Generate logit predictions:
  logits = self.fc(attention_vector)
  shape_checker(logits, ('batch', 't', 'output_vocab_size'))

  return DecoderOutput(logits, attention_weights), state

class MaskedLoss(tf.keras.losses.Loss):
  def __init__(self):
    self.name = 'masked_loss'
    self.loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')

  def __call__(self, y_true, y_pred):
    shape_checker = ShapeChecker()
    shape_checker(y_true, ('batch', 't'))
    shape_checker(y_pred, ('batch', 't', 'logits'))

    # Calculate the loss for each item in the batch.
    loss = self.loss(y_true, y_pred)
    shape_checker(loss, ('batch', 't'))

    # Mask off the losses on padding.
    mask = tf.cast(y_true != 0, tf.float32)
    shape_checker(mask, ('batch', 't'))
    loss *= mask

    # Return the total.
    return tf.reduce_sum(loss)

class TrainTranslator(tf.keras.Model):
  def __init__(self, embedding_dim, units,
               input_text_processor,
               output_text_processor, 
               use_tf_function=True):
    super().__init__()
    # Build the encoder and decoder
    encoder = Encoder(len(input_text_processor.get_vocabulary()),
                      embedding_dim, units)
    decoder = Decoder(len(output_text_processor.get_vocabulary()),
                      embedding_dim, units)

    self.encoder = encoder
    self.decoder = decoder
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor
    self.use_tf_function = use_tf_function
    self.shape_checker = ShapeChecker()

  def train_step(self, inputs):
    self.shape_checker = ShapeChecker()
    if self.use_tf_function:
      return self._tf_train_step(inputs)
    else:
      return self._train_step(inputs)

def _preprocess(self, input_text, target_text):
  self.shape_checker(input_text, ('batch',))
  self.shape_checker(target_text, ('batch',))

  # Convert the text to token IDs
  input_tokens = self.input_text_processor(input_text)
  target_tokens = self.output_text_processor(target_text)
  self.shape_checker(input_tokens, ('batch', 's'))
  self.shape_checker(target_tokens, ('batch', 't'))

  # Convert IDs to masks.
  input_mask = input_tokens != 0
  self.shape_checker(input_mask, ('batch', 's'))

  target_mask = target_tokens != 0
  self.shape_checker(target_mask, ('batch', 't'))

  return input_tokens, input_mask, target_tokens, target_mask

def _train_step(self, inputs):
  input_text, target_text = inputs  

  (input_tokens, input_mask,
   target_tokens, target_mask) = self._preprocess(input_text, target_text)

  max_target_length = tf.shape(target_tokens)[1]

  with tf.GradientTape() as tape:
    # Encode the input
    enc_output, enc_state = self.encoder(input_tokens)
    self.shape_checker(enc_output, ('batch', 's', 'enc_units'))
    self.shape_checker(enc_state, ('batch', 'enc_units'))

    # Initialize the decoder's state to the encoder's final state.
    # This only works if the encoder and decoder have the same number of
    # units.
    dec_state = enc_state
    loss = tf.constant(0.0)

    for t in tf.range(max_target_length-1):
      # Pass in two tokens from the target sequence:
      # 1. The current input to the decoder.
      # 2. The target the target for the decoder's next prediction.
      new_tokens = target_tokens[:, t:t+2]
      step_loss, dec_state = self._loop_step(new_tokens, input_mask,
                                             enc_output, dec_state)
      loss = loss + step_loss

    # Average the loss over all non padding tokens.
    average_loss = loss / tf.reduce_sum(tf.cast(target_mask,tf.float32))

  # Apply an optimization step
  variables = self.trainable_variables 
  gradients = tape.gradient(average_loss, variables)
  self.optimizer.apply_gradients(zip(gradients, variables))

  # Return a dict mapping metric names to current value
  return {'batch_loss': average_loss}

def _loop_step(self, new_tokens, input_mask, enc_output, dec_state):
  input_token, target_token = new_tokens[:, 0:1], new_tokens[:, 1:2]

  # Run the decoder one step.
  decoder_input = DecoderInput(new_tokens=input_token,
                               enc_output=enc_output,
                               mask=input_mask)

  dec_result, dec_state = self.decoder(decoder_input, state=dec_state)
  self.shape_checker(dec_result.logits, ('batch', 't1', 'logits'))
  self.shape_checker(dec_result.attention_weights, ('batch', 't1', 's'))
  self.shape_checker(dec_state, ('batch', 'dec_units'))

  # `self.loss` returns the total for non-padded tokens
  y = target_token
  y_pred = dec_result.logits
  step_loss = self.loss(y, y_pred)

  return step_loss, dec_state

@tf.function(input_signature=[[tf.TensorSpec(dtype=tf.string, shape=[None]),
                               tf.TensorSpec(dtype=tf.string, shape=[None])]])
def _tf_train_step(self, inputs):
  return self._train_step(inputs)

class BatchLogs(tf.keras.callbacks.Callback):
  def __init__(self, key):
    self.key = key
    self.logs = []

  def on_train_batch_end(self, n, logs):
    self.logs.append(logs[self.key])

def train_model(train_translator, dataset, path, num=8):

#   tf.debugging.set_log_device_placement(True)
#   gpus = tf.config.list_physical_devices('GPU')
#   strategy = tf.distribute.MirroredStrategy(gpus)
#   with strategy.scope():    
#   with tf.device('/GPU:0'):
      cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=path,
                                                 save_weights_only=True,
                                                 verbose=1)
      batch_loss = BatchLogs('batch_loss')
      train_translator.fit(dataset, epochs=num,callbacks=[batch_loss,cp_callback])

      return train_translator

class Translator(tf.Module):

  def __init__(self, encoder, decoder, input_text_processor,
               output_text_processor):
    self.encoder = encoder
    self.decoder = decoder
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor

    self.output_token_string_from_index = (
        tf.keras.layers.experimental.preprocessing.StringLookup(
            vocabulary=output_text_processor.get_vocabulary(),
            mask_token='',
            invert=True))

    # The output should never generate padding, unknown, or start.
    index_from_string = tf.keras.layers.experimental.preprocessing.StringLookup(
        vocabulary=output_text_processor.get_vocabulary(), mask_token='')
    token_mask_ids = index_from_string(['', '[UNK]', '[START]']).numpy()

    token_mask = np.zeros([len(index_from_string.get_vocabulary())], dtype=np.bool)
    token_mask[np.array(token_mask_ids)] = True
    self.token_mask = token_mask

    self.start_token = index_from_string('[START]')
    self.end_token = index_from_string('[END]')

def tokens_to_text(self, result_tokens):
  shape_checker = ShapeChecker()
  shape_checker(result_tokens, ('batch', 't'))
  result_text_tokens = self.output_token_string_from_index(result_tokens)
  shape_checker(result_text_tokens, ('batch', 't'))

  result_text = tf.strings.reduce_join(result_text_tokens,
                                       axis=1, separator=' ')
  shape_checker(result_text, ('batch'))

  result_text = tf.strings.strip(result_text)
  shape_checker(result_text, ('batch',))
  return result_text

def sample(self, logits, temperature):
  shape_checker = ShapeChecker()
  # 't' is usually 1 here.
  shape_checker(logits, ('batch', 't', 'vocab'))
  shape_checker(self.token_mask, ('vocab',))

  token_mask = self.token_mask[tf.newaxis, tf.newaxis, :]
  shape_checker(token_mask, ('batch', 't', 'vocab'), broadcast=True)

  # Set the logits for all masked tokens to -inf, so they are never chosen.
  logits = tf.where(self.token_mask, -np.inf, logits)

  if temperature == 0.0:
    new_tokens = tf.argmax(logits, axis=-1)
  else: 
    logits = tf.squeeze(logits, axis=1)
    new_tokens = tf.random.categorical(logits/temperature,
                                        num_samples=1)

  shape_checker(new_tokens, ('batch', 't'))

  return new_tokens

def translate_unrolled(self,
                       input_text, *,
                       max_length=50,
                       return_attention=True,
                       temperature=1.0):
  batch_size = tf.shape(input_text)[0]
  input_tokens = self.input_text_processor(input_text)
  enc_output, enc_state = self.encoder(input_tokens)

  dec_state = enc_state
  new_tokens = tf.fill([batch_size, 1], self.start_token)

  result_tokens = []
  attention = []
  done = tf.zeros([batch_size, 1], dtype=tf.bool)

  for _ in range(max_length):
    dec_input = DecoderInput(new_tokens=new_tokens,
                             enc_output=enc_output,
                             mask=(input_tokens!=0))

    dec_result, dec_state = self.decoder(dec_input, state=dec_state)

    attention.append(dec_result.attention_weights)

    new_tokens = self.sample(dec_result.logits, temperature)

    # If a sequence produces an `end_token`, set it `done`
    done = done | (new_tokens == self.end_token)
    # Once a sequence is done it only produces 0-padding.
    new_tokens = tf.where(done, tf.constant(0, dtype=tf.int64), new_tokens)

    # Collect the generated tokens
    result_tokens.append(new_tokens)

    if tf.executing_eagerly() and tf.reduce_all(done):
      break

  result_tokens = tf.concat(result_tokens, axis=-1)
  result_text = self.tokens_to_text(result_tokens)

  if return_attention:
    attention_stack = tf.concat(attention, axis=1)
    return {'text': result_text, 'attention': attention_stack}
  else:
    return {'text': result_text}

@tf.function(input_signature=[tf.TensorSpec(dtype=tf.string, shape=[None])])
def tf_translate(self, input_text):
  return self.translate(input_text)

def translate_symbolic(self,
                       input_text,
                       *,
                       max_length=50,
                       return_attention=True,
                       temperature=1.0):
  shape_checker = ShapeChecker()
  shape_checker(input_text, ('batch',))

  batch_size = tf.shape(input_text)[0]

  # Encode the input
  input_tokens = self.input_text_processor(input_text)
  shape_checker(input_tokens, ('batch', 's'))

  enc_output, enc_state = self.encoder(input_tokens)
  shape_checker(enc_output, ('batch', 's', 'enc_units'))
  shape_checker(enc_state, ('batch', 'enc_units'))

  # Initialize the decoder
  dec_state = enc_state
  new_tokens = tf.fill([batch_size, 1], self.start_token)
  shape_checker(new_tokens, ('batch', 't1'))

  # Initialize the accumulators
  result_tokens = tf.TensorArray(tf.int64, size=1, dynamic_size=True)
  attention = tf.TensorArray(tf.float32, size=1, dynamic_size=True)
  done = tf.zeros([batch_size, 1], dtype=tf.bool)
  shape_checker(done, ('batch', 't1'))

  for t in tf.range(max_length):
    dec_input = DecoderInput(
        new_tokens=new_tokens, enc_output=enc_output, mask=(input_tokens != 0))

    dec_result, dec_state = self.decoder(dec_input, state=dec_state)

    shape_checker(dec_result.attention_weights, ('batch', 't1', 's'))
    attention = attention.write(t, dec_result.attention_weights)

    new_tokens = self.sample(dec_result.logits, temperature)
    shape_checker(dec_result.logits, ('batch', 't1', 'vocab'))
    shape_checker(new_tokens, ('batch', 't1'))

    # If a sequence produces an `end_token`, set it `done`
    done = done | (new_tokens == self.end_token)
    # Once a sequence is done it only produces 0-padding.
    new_tokens = tf.where(done, tf.constant(0, dtype=tf.int64), new_tokens)

    # Collect the generated tokens
    result_tokens = result_tokens.write(t, new_tokens)

    if tf.reduce_all(done):
      break

  # Convert the list of generated token ids to a list of strings.
  result_tokens = result_tokens.stack()
  shape_checker(result_tokens, ('t', 'batch', 't0'))
  result_tokens = tf.squeeze(result_tokens, -1)
  result_tokens = tf.transpose(result_tokens, [1, 0])
  shape_checker(result_tokens, ('batch', 't'))

  result_text = self.tokens_to_text(result_tokens)
  shape_checker(result_text, ('batch',))

  if return_attention:
    attention_stack = attention.stack()
    shape_checker(attention_stack, ('t', 'batch', 't1', 's'))

    attention_stack = tf.squeeze(attention_stack, 2)
    shape_checker(attention_stack, ('t', 'batch', 's'))

    attention_stack = tf.transpose(attention_stack, [1, 0, 2])
    shape_checker(attention_stack, ('batch', 't', 's'))

    return {'text': result_text, 'attention': attention_stack}
  else:
    return {'text': result_text}

@tf.function(input_signature=[tf.TensorSpec(dtype=tf.string, shape=[None])])
def tf_translate(self, input_text):
  return self.translate(input_text)

def plot_attention(attention, sentence, predicted_sentence):
  sentence = tf_lower_and_split_punct(sentence).numpy().decode().split()
  predicted_sentence = predicted_sentence.numpy().decode().split() + ['[END]']
  fig = plt.figure(figsize=(10, 10))
  ax = fig.add_subplot(1, 1, 1)

  attention = attention[:len(predicted_sentence), :len(sentence)]

  ax.matshow(attention, cmap='viridis', vmin=0.0)

  fontdict = {'fontsize': 14}

  ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
  ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict)

  ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
  ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

  ax.set_xlabel('Input text')
  ax.set_ylabel('Output text')
  plt.suptitle('Attention weights')

# reloaded = tf.saved_model.load('translator')
# result = reloaded.tf_translate(three_input_text)

# result = reloaded.tf_translate(['The kitchen stinks'])
# print(result['text'])

if __name__ == "__main__":

#   install('tensorflow_text')
#   install('matplotlib')
#   import tensorflow_text as tf_text
#   import matplotlib.pyplot as plt
#   import matplotlib.ticker as ticker

  args, unknown = _parse_args()

  print(args.train)

  targ, inp = load_data(args.train)

  BUFFER_SIZE = len(inp)
  BATCH_SIZE = 64

  dataset = tf.data.Dataset.from_tensor_slices((inp, targ)).shuffle(BUFFER_SIZE)
  dataset = dataset.batch(BATCH_SIZE)

  args, unknown = _parse_args()

  targ, inp = load_data(args.train)

  BUFFER_SIZE = len(inp)
  BATCH_SIZE = 64

  dataset = tf.data.Dataset.from_tensor_slices((inp, targ)).shuffle(BUFFER_SIZE)
  dataset = dataset.batch(BATCH_SIZE)

  for example_input_batch, example_target_batch in dataset.take(1):
    print()
    break

  max_vocab_size = 500000

  input_text_processor = preprocessing.TextVectorization(standardize=tf_lower_and_split_punct,max_tokens=max_vocab_size)
  input_text_processor.adapt(inp)

  output_text_processor = preprocessing.TextVectorization(standardize=tf_lower_and_split_punct,max_tokens=max_vocab_size)
  output_text_processor.adapt(targ)

  input_vocab = np.array(input_text_processor.get_vocabulary())

  embedding_dim = 128
  units = 512

  example_tokens = input_text_processor(example_input_batch)

  encoder = Encoder(len(input_text_processor.get_vocabulary()),embedding_dim, units)
  example_enc_output, example_enc_state = encoder(example_tokens)

  attention_layer = BahdanauAttention(units)

  example_attention_query = tf.random.normal(shape=[len(example_tokens), 2, 10])

  context_vector, attention_weights = attention_layer(
      query=example_attention_query,
      value=example_enc_output,
      mask=(example_tokens != 0))

  attention_slice = attention_weights[0, 0].numpy()
  attention_slice = attention_slice[attention_slice != 0]

  Decoder.call = call
  decoder = Decoder(len(output_text_processor.get_vocabulary()),embedding_dim, units)

  example_output_tokens = output_text_processor(example_target_batch)

  start_index = output_text_processor._index_lookup_layer('[START]').numpy()
  first_token = tf.constant([[start_index]] * example_output_tokens.shape[0])

  dec_result, dec_state = decoder(inputs = DecoderInput(new_tokens=first_token,enc_output=example_enc_output,mask=(example_tokens != 0)),state = example_enc_state)

  sampled_token = tf.random.categorical(dec_result.logits[:, 0, :], num_samples=1)

  vocab = np.array(output_text_processor.get_vocabulary())
  first_word = vocab[sampled_token.numpy()]
  first_word[:5]

  dec_result, dec_state = decoder(DecoderInput(sampled_token,example_enc_output,mask=(example_tokens != 0)),state=dec_state)

  TrainTranslator._preprocess = _preprocess
  TrainTranslator._train_step = _train_step
  TrainTranslator._loop_step = _loop_step

  translator = TrainTranslator(embedding_dim, units,input_text_processor=input_text_processor,output_text_processor=output_text_processor,use_tf_function=False)
  opt = tf.keras.optimizers.Adam()
#     opt = tf.train.experimental.enable_mixed_precision_graph_rewrite(opt)
  translator.compile(optimizer=opt,loss=MaskedLoss())
  np.log(len(output_text_processor.get_vocabulary()))

  TrainTranslator._tf_train_step = _tf_train_step

  translator.use_tf_function = True

  translator.train_step([example_input_batch, example_target_batch])

#  tf.debugging.set_log_device_placement(True)
#  gpus = tf.config.list_logical_devices('GPU')
#  strategy = tf.distribute.MirroredStrategy(gpus)
#  with strategy.scope():

  with tf.device('/device:GPU:0'):

    train_translator = TrainTranslator(
        embedding_dim, units,
        input_text_processor=input_text_processor,
        output_text_processor=output_text_processor)

    opt = tf.keras.optimizers.Adam()
#    opt = tf.train.experimental.enable_mixed_precision_graph_rewrite(opt)

    train_translator.compile(optimizer=opt,loss=MaskedLoss())

    batch_loss = BatchLogs('batch_loss')

    train_translator = train_model(train_translator, dataset, os.path.join(args.sm_model_dir, "checkPoints/cp.ckpt"), 30)

  translator = Translator(
    encoder=train_translator.encoder,
    decoder=train_translator.decoder,
    input_text_processor=input_text_processor,
    output_text_processor=output_text_processor,
  )

  Translator.tokens_to_text = tokens_to_text

  example_output_tokens = tf.random.uniform(
      shape=[5, 2], minval=0, dtype=tf.int64,
      maxval=len(output_text_processor.get_vocabulary()))
  translator.tokens_to_text(example_output_tokens).numpy()

  Translator.sample = sample

  example_logits = tf.random.normal([5, 1, len(output_text_processor.get_vocabulary())])
  example_output_tokens = translator.sample(example_logits, temperature=1.0)
  example_output_tokens

  Translator.translate = translate_unrolled
  Translator.tf_translate = tf_translate
  Translator.translate = translate_symbolic

  if args.current_host == args.hosts[0]:
    tf.saved_model.save(translator, os.path.join(args.sm_model_dir, "conversationModel"),signatures={'serving_default': translator.tf_translate})

Jupyter Notebook code:

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()
from sagemaker.tensorflow import TensorFlow

estimator = TensorFlow(
    entry_point="trainConversationModel.py",
    role=role,
    dependencies=['requirements.txt'],
    instance_count=1,
    job_name='Conversational Model Training',
    instance_type="ml.p3.2xlarge",
    framework_version="2.4",
    py_version="py37",
    script_mode=True,
    max_run=126000
)

estimator.fit(training_data_uri)

Expected behavior The model is supposed to train (it trains successfully if the dataset size is small).

Logs

2021-10-06 12:20:18,943 sagemaker-training-toolkit INFO     Invoking user script

Training Env:

{
    "additional_framework_parameters": {},
    "channel_input_dirs": {
        "training": "/opt/ml/input/data/training"
    },
    "current_host": "algo-1",
    "framework_module": "sagemaker_tensorflow_container.training:main",
    "hosts": [
        "algo-1"
    ],
    "hyperparameters": {
        "model_dir": "s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model"
    },
    "input_config_dir": "/opt/ml/input/config",
    "input_data_config": {
        "training": {
            "TrainingInputMode": "File",
            "S3DistributionType": "FullyReplicated",
            "RecordWrapperType": "None"
        }
    },
    "input_dir": "/opt/ml/input",
    "is_master": true,
    "job_name": "tensorflow-training-2021-10-06-12-13-05-138",
    "log_level": 20,
    "master_hostname": "algo-1",
    "model_dir": "/opt/ml/model",
    "module_dir": "s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/source/sourcedir.tar.gz",
    "module_name": "trainConversationModel",
    "network_interface_name": "eth0",
    "num_cpus": 8,
    "num_gpus": 1,
    "output_data_dir": "/opt/ml/output/data",
    "output_dir": "/opt/ml/output",
    "output_intermediate_dir": "/opt/ml/output/intermediate",
    "resource_config": {
        "current_host": "algo-1",
        "hosts": [
            "algo-1"
        ],
        "network_interface_name": "eth0"
    },
    "user_entry_point": "trainConversationModel.py"
}

Environment variables:

SM_HOSTS=["algo-1"]
SM_NETWORK_INTERFACE_NAME=eth0
SM_HPS={"model_dir":"s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model"}
SM_USER_ENTRY_POINT=trainConversationModel.py
SM_FRAMEWORK_PARAMS={}
SM_RESOURCE_CONFIG={"current_host":"algo-1","hosts":["algo-1"],"network_interface_name":"eth0"}
SM_INPUT_DATA_CONFIG={"training":{"RecordWrapperType":"None","S3DistributionType":"FullyReplicated","TrainingInputMode":"File"}}
SM_OUTPUT_DATA_DIR=/opt/ml/output/data
SM_CHANNELS=["training"]
SM_CURRENT_HOST=algo-1
SM_MODULE_NAME=trainConversationModel
SM_LOG_LEVEL=20
SM_FRAMEWORK_MODULE=sagemaker_tensorflow_container.training:main
SM_INPUT_DIR=/opt/ml/input
SM_INPUT_CONFIG_DIR=/opt/ml/input/config
SM_OUTPUT_DIR=/opt/ml/output
SM_NUM_CPUS=8
SM_NUM_GPUS=1
SM_MODEL_DIR=/opt/ml/model
SM_MODULE_DIR=s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/source/sourcedir.tar.gz
SM_TRAINING_ENV={"additional_framework_parameters":{},"channel_input_dirs":{"training":"/opt/ml/input/data/training"},"current_host":"algo-1","framework_module":"sagemaker_tensorflow_container.training:main","hosts":["algo-1"],"hyperparameters":{"model_dir":"s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model"},"input_config_dir":"/opt/ml/input/config","input_data_config":{"training":{"RecordWrapperType":"None","S3DistributionType":"FullyReplicated","TrainingInputMode":"File"}},"input_dir":"/opt/ml/input","is_master":true,"job_name":"tensorflow-training-2021-10-06-12-13-05-138","log_level":20,"master_hostname":"algo-1","model_dir":"/opt/ml/model","module_dir":"s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/source/sourcedir.tar.gz","module_name":"trainConversationModel","network_interface_name":"eth0","num_cpus":8,"num_gpus":1,"output_data_dir":"/opt/ml/output/data","output_dir":"/opt/ml/output","output_intermediate_dir":"/opt/ml/output/intermediate","resource_config":{"current_host":"algo-1","hosts":["algo-1"],"network_interface_name":"eth0"},"user_entry_point":"trainConversationModel.py"}
SM_USER_ARGS=["--model_dir","s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model"]
SM_OUTPUT_INTERMEDIATE_DIR=/opt/ml/output/intermediate
SM_CHANNEL_TRAINING=/opt/ml/input/data/training
SM_HP_MODEL_DIR=s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model
PYTHONPATH=/opt/ml/code:/usr/local/bin:/usr/local/lib/python37.zip:/usr/local/lib/python3.7:/usr/local/lib/python3.7/lib-dynload:/usr/local/lib/python3.7/site-packages

Invoking script with the following command:

/usr/local/bin/python3.7 trainConversationModel.py --model_dir s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model

1 Physical GPUs, 1 Logical GPUs
/opt/ml/input/data/training
2021-10-06 12:25:54,155 sagemaker-training-toolkit ERROR    ExecuteUserScriptError:
Command "/usr/local/bin/python3.7 trainConversationModel.py --model_dir s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model"

2021-10-06 12:26:13 Uploading - Uploading generated training model
2021-10-06 12:26:40 Failed - Training job failed

---------------------------------------------------------------------------
UnexpectedStatusException                 Traceback (most recent call last)
<ipython-input-7-b3174dee663f> in <module>
----> 1 estimator.fit(training_data_uri)

~/anaconda3/envs/tensorflow2_p37/lib/python3.7/site-packages/sagemaker/estimator.py in fit(self, inputs, wait, logs, job_name, experiment_config)
    691         self.jobs.append(self.latest_training_job)
    692         if wait:
--> 693             self.latest_training_job.wait(logs=logs)
    694 
    695     def _compilation_job_name(self):

~/anaconda3/envs/tensorflow2_p37/lib/python3.7/site-packages/sagemaker/estimator.py in wait(self, logs)
   1651         # If logs are requested, call logs_for_jobs.
   1652         if logs != "None":
-> 1653             self.sagemaker_session.logs_for_job(self.job_name, wait=True, log_type=logs)
   1654         else:
   1655             self.sagemaker_session.wait_for_job(self.job_name)

~/anaconda3/envs/tensorflow2_p37/lib/python3.7/site-packages/sagemaker/session.py in logs_for_job(self, job_name, wait, poll, log_type)
   3722 
   3723         if wait:
-> 3724             self._check_job_status(job_name, description, "TrainingJobStatus")
   3725             if dot:
   3726                 print()

~/anaconda3/envs/tensorflow2_p37/lib/python3.7/site-packages/sagemaker/session.py in _check_job_status(self, job, desc, status_key_name)
   3282                 ),
   3283                 allowed_statuses=["Completed", "Stopped"],
-> 3284                 actual_status=status,
   3285             )
   3286 

UnexpectedStatusException: Error for Training job tensorflow-training-2021-10-06-12-13-05-138: Failed. Reason: AlgorithmError: ExecuteUserScriptError:
Command "/usr/local/bin/python3.7 trainConversationModel.py --model_dir s3://sagemaker-ap-northeast-1-228620936244/tensorflow-training-2021-10-06-12-13-05-138/model"

System information A description of your system. Please provide:

Additional context The script runs successfully on a small dataset

pedro-twaice commented 2 years ago

Hi @Electric-Dragon, I am having quite a similar problem. Were you able to find a solution to yours? :)

Electric-Dragon commented 2 years ago

@pedro-twaice unfortunately no :/

pedro-twaice commented 2 years ago

@Electric-Dragon uhm okay!! Thanks for the reply :) In case I find any solution, I'll post it here

Electric-Dragon commented 2 years ago

@pedro-twaice Okay, thanks a lot!

mufaddal-rohawala commented 11 months ago

Hi Thanks for using SageMaker! Unfortunately this error is coming from backend training service, and not really a usability experience gap from SageMaker Python SDK. The suggestion here is to contact AWS Support to engage the Backend Service Team to get more information on the UnexpectedStatusException.

Please feel free to reopen this issue if there was an underlying SageMaker Python SDK related issue.