Because we want to export the model with our preprocessing steps, we need to guarantee that the model input names match the transformed feature names from preprocessing_fn(). In our example model, we reuse the transformed_name() function to add the suffix _xf to our features.
import tensorflow as tf
import tensorflow_hub as hub
- The Trainer component requires the following inputs:
- The previously generated data schema, generated by the data validation step
- The transformed data and its preprocessing graph
- Training parameters (e.g., the number of training steps)
- A module file containing a `run_fn()` function, which defines the training process
- `run_fn()` function is a generic entry point to the training steps and not tf.Keras specific. It carries out the following steps:
- Loading the training and validation data (or the data generator)
- Defining the model architecture and compiling the model
- Training the model
- Exporting the model to be evaluated in the next pipeline step
```python
def run_fn(fn_args):
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
# Call the input_fn to get data generators
train_dataset = input_fn(fn_args.train_files, tf_transform_output)
eval_dataset = input_fn(fn_args.eval_files, tf_transform_output)
# Call the get_model function to get the compiled Keras model.
model = get_model()
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir =log_dir, update_freq='batch'
)
# Train the model using the number of training and
# evaluation steps passed by the Trainer component
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback]
)
# Define the model signature, which includes the serving
# function we will describe later
signatures = {
'serving_default':
_get_serve_tf_examples_fn(
model,
tf_transform_output
).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples'
)
)
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
- Applying the preprocessing graph to model inputs
```python
def _get_serve_tf_examples_fn(model, tf_transform_output):
# Load the preprocessing graph
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(LABEL_KEY)
# Parse the raw tf.Example records from the request
parsed_features = tf.io.parse_example(
serialized_tf_examples, feature_spec
)
# Apply the preprocessing transformation to raw data
transformed_features = model.tft_layer(parsed_features)
# Perform prediction with preprocessed data
outputs = model(transformed_features)
return {'outputs': outputs}
return serve_tf_examples_fn
Running the Trainer Component
The Python module file, here saved as train.py, containing the run_fn(), input_fn(), _get_serve_tf_examples_fn(), and other associated functions we discussed earlier
The transformed examples generated by the Transform component
The transform graph generated by the Transform component
The schema generated by the data validation component
The number of training and evaluation steps
from tfx.components import Trainer
from tfx.components.base import executor_spec
# Load the GenericExecutor to override the training executor
from tfx.components.trainer.executor import GenericExecutor
from tfx.proto import trainer_pb2
- Then connect to `http://localhost:6006/` to view TensorBoard. This gives us a larger window to view the details.
- We compile our model by selecting the loss function, optimizer, and the metrics
- Loss function: This measures the accuracy of the model during training. We need to minimize this function to ensure that the model is on the right path. There are three main loss functions that we will be using:
- `binary_crossentropy`: This is the default loss function that is used for binary classification
- `categorical_crossentropy`: This is the default loss function that is used for multi-class classification.
- `sparse_categorical_crossentropy`: This is used for multi-class classification, without the need of using one hot encoding.
- Optimizer: Depending on the data and the loss function, the model makes changes to itself to produce the most optimal results. The optimizer is used to alter the value of the weights in order to minimize the loss function.
- Metrics: This keeps a check on the steps that the model takes doing training and testing. The "accuracy" option considers the number of images that are correctly recognized and classified
#### Estimate
#### Tests
Description
Actions
preprocessing_fn()
. In our example model, we reuse thetransformed_name()
function to add the suffix_xf
to our features.LABEL_KEY = 'consumer_disputed'
Feature name, feature dimensionality
ONE_HOT_FEATURES = { "product": 11, "sub_product": 44, "company_response": 5, "state": 56, "issue": 88 }
Feature name, bucket count
BUCKET_FEATURES = { "zip_code": 10 }
Feature name, value is unused
TEXT_FEATURES = { "consumer_complaint_narrative": None }
def transformed_name(key): return key + '_xf'
def get_model():
One-hot categorical features
input_features = []
Loop over the features and create an input for each feature
for key, dim in ONE_HOT_FEATURES.items(): input_features.append( tf.keras.Input(shape=(dim+1, ), name=transformed_name(key)) )
Adding bucketized features
for key, dim in BUCKET_FEATURES.items(): input_features.append( tf.keras.Input(shape=(dim+1,), name=transformed_name(key)) )
Adding text input features
input_texts = [] for key in TEXT_FEATURES.keys(): input_texts.append( tf.keras.Input(shape=(1, ), name=transformed_name(key), dtype=tf.string) )
inputs = input_features + input_texts
Embed text features
MODULE_URL = "https://tfhub.dev/google/universal-sentence-encoder/4"
Load the tf.hub module of the Universal Sentence Encoder model.
embed = hub.KerasLayer(MODULE_URL)
Keras inputs are two-dimensional, but the encoder expects one-dimesional inputs
reshaped_narrative = tf.reshape(inputs_text[0], [-1]) embed_narrative = embed(reshaped_narrative) deep_ff = tf.keras.layers.Reshape((512, ), input_shape=(1, 512))(embed_narrative) deep = tf.keras.layers.Dense(256, activation='relu')(deep_ff) deep = tf.keras.layers.Dense(64, activation='relu')(deep) deep = tf.keras.layers.Dense(16, activation='relu')(deep) wide_ff = tf.keras.layers.concatenate(input_features) wide = tf.keras.layers.Dense(16, activation='relu')(wide_ff) both = tf.keras.layers.concatenate([deep, wide]) output = tf.keras.layers.Dense(1, activation='sigmoid')(both)
Assemble the model graph with the functional API.
keras_model = tf.keras.models.Model(inputs, output) keras_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=[ tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.TruePositives() ]) return keras_model
input_fn()
functiondef input_fn(file_pattern, tf_transform_output, batch_size=32): transformed_feature_spec = ( tf_transform_output.transformed_feature_spec().copy() )
The dataset will be batched into the correct batch size
dataset = tf.data.experimental.make_batched_features_dataset( file_pattern=file_pattern, batch_size=batch_size, features=transformed_feature_spec, reader=_gzip_reader_fn, label_key=transformed_name(LABEL_KEY) ) return dataset
train.py
, containing therun_fn()
,input_fn()
,_get_serve_tf_examples_fn()
, and other associated functions we discussed earlierTRAINING_STEPS = 1000 EVALUATION_STEPS = 100
base_dir = os.getcwd() train_file = os.path.join(base_dir, 'components/train.py')
trainer = Trainer( module_file=train_file,
Override the executor to load the run_fn() function.
) context.run(trainer)
run_fn()
function and log the training to a folder we specify:%load_ext tensorboard %tensorboard --logdir {model_dir}