In our experience, TFT requires the steepest learning curve of any TFX library because expressing preprocessing steps through TensorFlow operations is required.
Preprocessing your data efficiently in the context of the entire dataset
Scaling the preprocessing steps effectively
Avoiding a potential training-serving skew
Data Preprocessing with TFT: TFT processes the data that we ingested into our pipeline with the earlier generated dataset schema, and it outputs two artifacts:
Preprocessed training and evaluation datasets in the TFRecord format. The produced datasets can be consumed downstream in the Trainer component of our pipeline
Exported preprocessing graph (with assets), which will be used when we'll export our machine learning model.
Actions
Installation
$ pip install tensorflow-transform
Preprocessing Strategies
In the following code, we difine our features. For simpler processing later on, we group the input feature names in dictionaries representing each transform output data type: one-hot features, bucketized features, and raw string representations:
import tensorflow as tf
import tensorflow_transform as tft
- Before we can loop over these input feature dictionaries, let's define a few helper functions to transform the data efficiently. It is good practice to rename the features by appending a suffix to the feature name (e,g, _xf). The suffix will help distinguish whether errors are originating from input or output features and prevent us from accidentally using a nontransformed feature in our actual model:
```python
def transformed_name(key):
return key + '_xf'
Some of our features are of a sparse nature, but TFT expects the transformation outputs to be dense. We can use the following helper function to convert sparse to dense features and to fill the missing values with a default value:
def fill_in_missing(x):
default_value = '' if x.dtype == tf.string else 0
if type(x) == tf.SparseTensor:
# Converts a SparseTensor into a dense tensor.
x = tf.sparse.to_dense(
# Represents a sparse tensor.
tf.SparseTensor(
indices=x.indices,
values=x.values,
dense_shape=[x.dense_shape[0],1]
),
default_value=default_value
)
# Removes dimensions of size 1 from the shape of a tensor.
return tf.squeeze(x, axis=1)
In our model, we represent most input features as one-hot encoded vectors. The following helper function converts a given index to a one-hot encoded representation and returns the vector:
Before we can process our features, we need one more helper function to convert zip codes represented as strings to float values. Our dataset lists zip codes as follows:
zip codes
97XXX
98XXX
To bucketize records with missing zip codes correctly, we replaced the placeholders with zeros and bucketized the resulting floats into 10 buckets:
def convert_zip_code(zip_code):
if zip_code == '':
zip_code = "00000"
# Replace elements of input matching regex pattern with rewrite
zip_code = tf.strings.regex_replace(zip_code, r'X{0,5}', "0")
# Converts each string in the input Tensor to the specified numeric type.
zip_code = tf.strings.to_number(zip_code, out_type=tf.float32)
return zip_code
With all the helper functions in place, we can now loop over each feature column and transform it depending on the type. For example, for our features to be converted to one-hot features, we convert the category names to an index with tf.compute_and_apply_vocabulary() and then convert the index to a one-hot vector representation with our helper function convert_num_to_one_hot(). Since we are using tft.compute_and_apply_vocabulary(), TensorFlow Transform will first loop over all categories and then determine a complete category to index mapping. This mapping will then be applied during our evaluation and serving phase of the model:
def preprocessing_fn(inputs):
outputs = {}
for key in ONE_HOT_FEATURES.keys():
dim = ONE_HOT_FEATURES[key]
int_value = tft.compute_and_apply_vocabulary(
fill_in_missing(inputs[key]), top_k=dim + 1
)
outputs[transformed_name(key)] = convert_num_to_one_hot(
int_value, num_labels=dim + 1
)
return outputs
Our processing of the bucket features is very similar. We decided to bucketize the zipcodes beacause one-hot encoded zip codes seemed too sparse. Each feature is bucketized into, in our case, 10 buckets, and we encode the index of the bucket as one-hot vectors:
for key, bucket_count in BUCKET_FEATURES.items():
temp_feature = tft.bucketize(
convert_zip_code(fill_in_missing(inputs[key])),
bucket_count,
always_return_num_quantiles=False)
outputs[transformed_name(key)] = convert_num_to_one_hot(
temp_feature, num_labels=bucket_count+1
)
Our text input features are well as our label column don't require any transformations; therefore, we simply convert them to dense features in case a feature might be sparse:
for key in TEXT_FEATURES.keys():
outputs[transformed_name(key)] = fill_in_missing(inputs[key])
outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY])
If we use the Transform component from TFX in our pipeline, it expects the transformation code to be provided in a seperate Python file. The name of the module file can be set by user (e.g., in our case transform.py), but the entry point preprocessing_fn() needs to be contained in the module file and the function can't be renamed:
Description
Actions
LABEL_KEY = 'consumer_disputed'
Feature name, feature dimensionality
ONE_HOT_FEATURES = { "product": 11, "sub_product": 45, "company_response": 5, "state": 60, "issue": 90 }
Feature name, bucket count
BUCKET_FEATURES = { "zip_code": 10 }
Feature name, value is unused
TEXT_FEATURES = { "consumer_complaint_narrative": None
tf.compute_and_apply_vocabulary()
and then convert the index to a one-hot vector representation with our helper functionconvert_num_to_one_hot()
. Since we are usingtft.compute_and_apply_vocabulary()
, TensorFlow Transform will first loop over all categories and then determine a complete category to index mapping. This mapping will then be applied during our evaluation and serving phase of the model:Our text input features are well as our label column don't require any transformations; therefore, we simply convert them to dense features in case a feature might be sparse:
transform.py
), but the entry pointpreprocessing_fn()
needs to be contained in the module file and the function can't be renamed:transform_file = os.path.join(base_dir, 'components/transform.py') transform = Transform( examples=example_gen.outputs['examples'], schema=schema_gen.outputs['schema'], module_file=transform_file ) context.run(transform)