Closed luischinchillagarcia closed 3 years ago
I see 2 initial issues with the way the analyzer is currently used: 1) The return value of calling tft.ptransform_analyzer returns a list of outputs (as mentioned by the documentation as well), in order to get access to the tensor output of your single output you'll need to do something like:
ptransform_analyzer_outputs = tft.ptransform_analyzer(...)
outputs_dict['negative_samples'] = ptransform_analyzer_outputs[0]
2) The output shape given to the ptransform_analyzer should be in your case [[None]] since you have one output, and some number of rows in the output.
Beyond that, once you get the pipeline working, you should be able to use Beam operations to achieve groupby, you'll just need to format the outputs accordingly.
Thank you for the quick response. We ran into further issues that we couldn't find any solutions to. These seem to be related to the way we are passing the ptransform
parameter for tft.ptransform_analyzer
. If possible, would it be possible to provide an example defining a simple PTransform and applying it to tft.ptransform_analyzer
.
Here were the two errors we encountered when attempting to do this in this Colab notebook:
beam.PTransform
- We get an issue relating to singletons. As a test case we are trying to apply a basic pass-through function that just returns the input (or something similar).ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "_TensorBinding(value=(array([3]),), tensor_name='class_testing/Placeholder:0', is_asset_filepath=False)", "_TensorBinding(value=(array([3, 3]),), tensor_name='class_testing/Placeholder:0', is_asset_filepath=False)". [while running 'AnalyzeAndTransformDataset/AnalyzeDataset/CreateSavedModel/BindTensors']
beam.DoFn
- When subclassing, this returns the error below. This is because the argument ptransform
seems to accept ONLY a beam.ptransform
subclass.TypeError: unsupported operand type(s) for >>: 'str' and 'TestClass1'
Yes, the parameter to the ptransform_analyzer must be a beam PTransform. A Beam PTransform can compose Beam DoFns.
Here's a working example:
class TestClass1(beam.DoFn):
def __init_1(self, **kwargs):
super().__init__(**kwargs)
def process(self, row, **kwargs):
return row
class TestClass2(beam.PTransform):
@staticmethod
def _flatten_fn(batch_values):
for (value,) in zip(batch_values):
yield value
def expand(self, pcoll):
# Transform logic goes here.
def x_print(x):
print('x is: ', x)
return x
return (pcoll
| beam.Map(lambda x: x)
| beam.ParDo(TestClass1())
| beam.FlatMap(self._flatten_fn)
| 'print1' >> beam.Map(x_print)
| beam.CombineGlobally(sum)
)
You'll notice that TestClass2 (PTransform) is applying TestClass1 (DoFn) to the data. You'll also notice that TestClass2 also reduces the data to a single output as the ptransform_analyzer parameters indicated (output_dtypes, output_shapes), your example returned a multi-element PCollection which is why you were getting the "PCollection of size 2" error.
When using TestClass2 in your preprocessing_fn, you'll also have to make sure that you're returning a batched value correctly. As this PTransform returns a single scalar, it would have to be broadcasted or reshaped somehow. The easiest way for me to demonstrate is to add it to the original input s:
def preprocessing_fn(inputs):
"""Preprocess input columns into transformed columns."""
outputs_dict = {}
# Doing normal tf operations within preproc works!
x = inputs['clientId']
y = inputs['itemId']
s = inputs['label']
outputs_dict = {}
square_s = tf.math.square(s)
# We add to s here because the output must be batched just like s.
# There are other ways of achieving this but this is easy for now.
outputs_dict['negative_samples'] = s + tft.ptransform_analyzer(
inputs=[
square_s,
],
output_dtypes=[tf.int64],
output_shapes=[[None]],
ptransform=TestClass2(),
name='class_testing'
)[0]
return outputs_dict
Hope this helps.
Hi Zohar,
Thanks for providing this example, it's been super helpful! Luis and I have been pulling it apart to see how we can adapt it to our needs.
We'd like to go beyond returning a single value and return an element such as an array that we can use as a feature in our outputs_dict. In this colab we've tried to do just that, but we get a duplication of the results. We're guessing this is due to batching within the beam pipeline, however we're unsure how to prevent it and still return a single array.
Our goal is to use ptransform_analyzer
within a TFX transform component's preprocessing_fn
. For this particular project, we'd like to apply a GroupByKey
, but ultimately it would be helpful to apply any user-defined composite beam transforms not exposed by the TFT api. One of our concerns is that we might not be able to serialize the subclassed ptransform
and we'll be right back where we started.
Does this seem like a viable course of action, both in the context of a TFX pipeline and as a stand-alone usage of ptransform_analyzer
? If not, what would be the best way to apply user-defined beam transforms that aren't available in the TFT api?
Can you clarify where the duplication of results are in your colab? Looking at the prints in the comments look like what I would expect.
Yes, you should be able to write a PTransform that uses GroupByKey to compute what you need. What would be the issue with producing a serializable PTransform?
If you'd like assistance in making your custom PTransform work, I'd suggest explicitly defining the inputs, desired compute, and output so that we can create a colab that implements the logic.
Hi Zohar,
I hope you and the rest of the TFT team are well.
I've updated our colab with the returned vs. expected results and more comments to try and outline our desired compute. We're working on getting preprocessing_fn2
to return a grouped list of itemIds for each userId.
We've been successful in returning a list of tensors from ptransform_analyzer
with tagged results from GroupByKey. While our pipeline works with small data, we're unsure how to proceed at scale when we're returning an output tensor for each user in our dataset. Ideally, we would want to return a single result from ptransform_analyzer
that could be unpacked for each user as a next step within the preprocessing_fn
.
One problem we've been having has been programmatically reshaping each output tensor. Since the tensor shape would be unknown and can't be computed with tf.shape
, tf.size
, etc we've only been able to properly reshape our output list of tensors into a single [1,None] tensor by explicitly defining the tensor shape in either ptransform_analyzer
or in the tf.reshape
args. Using tf.stack
and tf.concat
have also been unsuccessful. Any guidance on shape management would be helpful as this would go beyond broadcasting a single value across an existing tensor as in your example usage.
Luis and I tracked the result duplication we were observing to the pipeline runner's desired_batch_size
arg. Is there a way to ensure we only return a single result from tft_beam.AnalyzeAndTransformDataset
regardless of batch size?
Thanks for all your help trouble shooting this with us.
Thanks for the details, I'm still trying to fully understand the problem we're trying to solve here -
How many userIds should this handle?
These output_shapes work as a hardcoded workaround, but would need to be set dynamically for production
Based on what? is meant to be the size of unique user IDs?
Depending on the size, you may want to prefer to user a vocabulary based implementation, we have examples of this in tf.Transform (per-key analyzers that use a vocabulary), but they don't use the ptransform_analyzer
, they use the underlying infra instead. Nonetheless it may be a helpful referecnce:
https://github.com/tensorflow/transform/blob/0f46bccd06e6e893589440f10e545c148e0e05da/tensorflow_transform/analyzers.py#L186-L199
You'll notice that we don't have too much documentation about the ptransform_analyzer because it's a very advanced option that we prefer users to avoid unless absolutely necessary.
Regarding your question about shape, could you please expand on what your desired output of the preprocessing_fn would be?
Returning varying shapes from your preprocessing_fn is possible by returning a tf.SparseTensor
, it must be left-aligned (i.e. ragged). It can have a dense shape which is (None, max_values_count), and the number of values can vary. Would that work for your case?
Regarding your question about the batch_size:
Is there a way to ensure we only return a single result from tft_beam.AnalyzeAndTransformDataset regardless of batch size?
What does returning a single result mean here? TransformDataset is running a map on your input data, are you intending on just returning a single instance from it? because that would not be possible.
You could instead only call tft_beam.AnalyzeDataset
and do a 'custom' Transform workflow.
I've tried to pare things down to highlight the main challenges we've been encountering. I split up and simplified our workflow into two colabs, one that's beam specific and one for TFX.
This beam colab shows what we're trying to do when transforming toy data from {userId, itemId} interaction records to a grouped {userId, array[itemId]} feature.
In production usage, we're expecting to pass on the order of 1MM userIds, 20-50K itemIds, and 100MMs of interactions through our TFX pipeline. We're definitely open to other ways to handle this grouping task, but we were hoping to keep all the preprocessing steps in a single transform function for both serving consistency and scalability.
This tfx colab demonstrates the issue we encountered when trying to serialize a preprocessing_fn
that uses ptransform_analyzer
within a TFX transform component.
We feel like we're on the right track, just missing some key steps in managing the outputs of ptransform_analyzer
and getting the results into an output dict.
Given the size of your outputs, I suggest going with a vocabulary approach. Note that by this I don't mean you can use tft.vocabulary, but you can use a similar approach and see the vocabulary implementation as an example.
Essentially you would write the results of your computation to a text file, the ptransform_analyzer would then just have one output - the location of that file, and then you can load this file to a TensorFlow table to perform lookups based on your keys.
Regarding the "duplication" of the results, this is working as intended and there is no way around that. Since tf.Transform performs a mapping of the input features, we can't just output values that don't correspond to the input values. For your case I would suggest to output a vocabulary file from your ptransform_analyzer, and return a copy of the inputs as the output of the preprocessing_fn. Then the vocabulary file will be accessible using this API: https://www.tensorflow.org/tfx/transform/api_docs/python/tft/TFTransformOutput#vocabulary_file_by_name
There's another option that would let you avoid using the ptransform_analyzer but you'd have to use a private API: tft.analyzers. _apply_cacheable_combiner_per_key_large
.
This would allow you to do exactly what I described above as long as you define a combiner, it will be applied to your data using a beam.CombinePerKey.
Here is an example demonstrating both the use of _apply_cacheable_combiner_per_key_large and how to use the vocabulary afterwards: https://colab.research.google.com/drive/1FWP8CMoSXvJ8-R6BK7HKO7Yv4GWq0zHd?usp=sharing (I'm not sure if you'll be able to open this link, lmk if not and I can try and share it differently).
Regarding the pickling PTransform issue, this is currently an issue with ptransform_analyzer
in interactive python. There is a temporary workaround at the moment, if you add this snippet after writing the transform2.py file and before running the transform component:
import sys
sys.modules['user_module'] = transform2
Closing this due to inactivity.Please feel free to reopen.
In trying to apply
tft.ptransform_analyzer
to create a custom PTransform, it seems unclear how exactly to use this function (there doesn't seem to be any documentation with examples). Specifically, it is unclear as to how to actually create the PTransform asked for in the parameterptransform
.For example, let's say we want to take the columns
userId, itemID, label
and we would like to groupby userId to getuserId, itemID_list, label_list
. Is this possible to achieve usingtft.ptransform_analyzer
? Here is Colab file attempting different ways to achieve this goal.