tensorflow / transform

Input pipeline framework
Apache License 2.0
984 stars 214 forks source link

AnalyzeAndTransformDataset raises ValueError when asked to transform int to string #197

Closed tordbb closed 2 years ago

tordbb commented 4 years ago

I am using the following test-code to try define the column Stringed Ints as strings. It contains ints, but I should be able to make tensorflow treat these as strings. When running test_tutorial_schema(), I get the error E tensorflow.python.framework.errors_impl.InternalError: Unsupported object type int. See full log at the bottom. Am I doing something wrong, or is there really no way to control how a data column should be interpreted by TFX?

"""Unit tests for data_cleaner.py."""

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
import tensorflow as tf
import tempfile
import tensorflow_transform.beam as tft_beam

def test_tutorial_schema():

    raw_data = [
        {'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'},
        {'Ints': 2, 'Stringed Ints': 2, 'Strings': 'world'}
    ]

    cols = ['Ints', 'Stringed Ints', 'Strings']

    intended_types = [tf.float32, tf.string, tf.string]
    true_types = [tf.float32, bytes, bytes]

    raw_data_metadata = dataset_metadata.DatasetMetadata(
        dataset_schema.from_feature_spec({
            cols[0]: tf.io.FixedLenFeature([], intended_types[0]),
            cols[1]: tf.io.FixedLenFeature([], intended_types[1]),
            cols[2]: tf.io.FixedLenFeature([], intended_types[2]),
        }))   

    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        transformed_dataset, transform_fn = (  # pylint: disable=unused-variable
            (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
                minimal_preprocessing_fn))

    transformed_data, transformed_metadata = transformed_dataset  # pylint: disable=unused-variable

def minimal_preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    my_ints = inputs['Ints']
    my_stringed_ints = inputs['Stringed Ints']
    my_strings = inputs['Strings']

    return {
        'Ints': my_ints,
        'Stringed Ints': my_stringed_ints,
        'Strings': my_strings
    }

(The visible bottom part of) error message:

========================= FAILURES =========================
___________________ test_tutorial_schema ___________________

self = <tensorflow.python.client.session.Session object at 0x7f6dfd0362d0>
fn = <function BaseSession._do_run.<locals>._run_fn at 0x7f6dfcfc5050>
args = ({<tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd0369f0>: array([1.], dtype=float32), <tensorflow.p... object at 0x7f6dfd036ef0>, <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfcfbe0b0>], [], None, None)
message = 'Unsupported object type int', m = None

    def _do_call(self, fn, *args):
      try:
>       return fn(*args)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1365: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

feed_dict = {<tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd0369f0>: array([1.], dtype=float32), <tensorflow.py...ype=object), <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd036c30>: array(['hello'], dtype=object)}
fetch_list = [<tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd036df0>, <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd036ef0>, <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfcfbe0b0>]
target_list = [], options = None, run_metadata = None

    def _run_fn(feed_dict, fetch_list, target_list, options, run_metadata):
      # Ensure any changes to the graph are reflected in the runtime.
      self._extend_graph()
      return self._call_tf_sessionrun(options, feed_dict, fetch_list,
>                                     target_list, run_metadata)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1350: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tensorflow.python.client.session.Session object at 0x7f6dfd0362d0>
options = None
feed_dict = {<tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd0369f0>: array([1.], dtype=float32), <tensorflow.py...ype=object), <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd036c30>: array(['hello'], dtype=object)}
fetch_list = [<tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd036df0>, <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd036ef0>, <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfcfbe0b0>]
target_list = [], run_metadata = None

    def _call_tf_sessionrun(self, options, feed_dict, fetch_list, target_list,
                            run_metadata):
      return tf_session.TF_SessionRun_wrapper(self._session, options, feed_dict,
                                              fetch_list, target_list,
>                                             run_metadata)
E     tensorflow.python.framework.errors_impl.InternalError: Unsupported object type int

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1443: InternalError

During handling of the above exception, another exception occurred:

self = <tensorflow_transform.beam.impl._RunMetaGraphDoFn object at 0x7f6dfd026110>
batch = [{'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'}]

    def _handle_batch(self, batch):
      self._update_metrics(batch)
      # Remove passthrough keys from the input data to make sure preprocessing_fn
      # won't see them. Making a copy of batch because mutating PCollection
      # elements is not allowed.
      # No need to remove (and cannot remove) the passthrough columns if
      # tfxio is used:
      # 1) The TensorAdapter expects the RecordBatch to be of the same schema as
      # statically determined by the TFXIO implementation the yields the
      # TensorAdapter.
      # 2) It's not possible to leak passthrough columns through TensorAdapter
      # because they are not going to be converted to Tensors.
      passthrough_data = None
      if self._passthrough_keys and not self._use_tfxio:
        batch = [copy.copy(x) for x in batch]
        passthrough_data = {
            key: [instance.pop(key) for instance in batch
                 ] for key in self._passthrough_keys
        }

      feed_list = self._make_feed_list(batch)
      try:
>       outputs_list = self._graph_state.callable_get_outputs(*feed_list)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

feed_args = ([1], [1], ['hello']), kwargs = {}
feed_dict = {<tf.Tensor 'transform/inputs/Ints:0' shape=(None,) dtype=float32>: [1], <tf.Tensor 'transform/inputs/F_Stringed_Ints:0' shape=(None,) dtype=string>: [1], <tf.Tensor 'transform/inputs/Strings:0' shape=(None,) dtype=string>: ['hello']}

    def _generic_run(*feed_args, **kwargs):
      feed_dict = {
          feed: feed_val for feed, feed_val in zip(feed_list, feed_args)
      }
>     return self.run(fetches, feed_dict=feed_dict, **kwargs)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1233: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tensorflow.python.client.session.Session object at 0x7f6dfd0362d0>
fetches = [<tf.Tensor 'transform/inputs/inputs/Ints_copy:0' shape=(None,) dtype=float32>, <tf.Tensor 'transform/inputs/inputs/F_...s_copy:0' shape=(None,) dtype=string>, <tf.Tensor 'transform/inputs/inputs/Strings_copy:0' shape=(None,) dtype=string>]
feed_dict = {<tf.Tensor 'transform/inputs/Ints:0' shape=(None,) dtype=float32>: [1], <tf.Tensor 'transform/inputs/F_Stringed_Ints:0' shape=(None,) dtype=string>: [1], <tf.Tensor 'transform/inputs/Strings:0' shape=(None,) dtype=string>: ['hello']}
options = None, run_metadata = None

    def run(self, fetches, feed_dict=None, options=None, run_metadata=None):
      """Runs operations and evaluates tensors in `fetches`.

      This method runs one "step" of TensorFlow computation, by
      running the necessary graph fragment to execute every `Operation`
      and evaluate every `Tensor` in `fetches`, substituting the values in
      `feed_dict` for the corresponding input values.

      The `fetches` argument may be a single graph element, or an arbitrarily
      nested list, tuple, namedtuple, dict, or OrderedDict containing graph
      elements at its leaves.  A graph element can be one of the following types:

      * A `tf.Operation`.
        The corresponding fetched value will be `None`.
      * A `tf.Tensor`.
        The corresponding fetched value will be a numpy ndarray containing the
        value of that tensor.
      * A `tf.sparse.SparseTensor`.
        The corresponding fetched value will be a
        `tf.compat.v1.SparseTensorValue`
        containing the value of that sparse tensor.
      * A `get_tensor_handle` op.  The corresponding fetched value will be a
        numpy ndarray containing the handle of that tensor.
      * A `string` which is the name of a tensor or operation in the graph.

      The value returned by `run()` has the same shape as the `fetches` argument,
      where the leaves are replaced by the corresponding values returned by
      TensorFlow.

      Example:

      ```python
         a = tf.constant([10, 20])
         b = tf.constant([1.0, 2.0])
         # 'fetches' can be a singleton
         v = session.run(a)
         # v is the numpy array [10, 20]
         # 'fetches' can be a list.
         v = session.run([a, b])
         # v is a Python list with 2 numpy arrays: the 1-D array [10, 20] and the
         # 1-D array [1.0, 2.0]
         # 'fetches' can be arbitrary lists, tuples, namedtuple, dicts:
         MyData = collections.namedtuple('MyData', ['a', 'b'])
         v = session.run({'k1': MyData(a, b), 'k2': [b, a]})
         # v is a dict with
         # v['k1'] is a MyData namedtuple with 'a' (the numpy array [10, 20]) and
         # 'b' (the numpy array [1.0, 2.0])
         # v['k2'] is a list with the numpy array [1.0, 2.0] and the numpy array
         # [10, 20].
  The optional `feed_dict` argument allows the caller to override
  the value of tensors in the graph. Each key in `feed_dict` can be
  one of the following types:

  * If the key is a `tf.Tensor`, the
    value may be a Python scalar, string, list, or numpy ndarray
    that can be converted to the same `dtype` as that
    tensor. Additionally, if the key is a
    `tf.compat.v1.placeholder`, the shape of
    the value will be checked for compatibility with the placeholder.
  * If the key is a
    `tf.sparse.SparseTensor`,
    the value should be a
    `tf.compat.v1.SparseTensorValue`.
  * If the key is a nested tuple of `Tensor`s or `SparseTensor`s, the value
    should be a nested tuple with the same structure that maps to their
    corresponding values as above.

  Each value in `feed_dict` must be convertible to a numpy array of the dtype
  of the corresponding key.

  The optional `options` argument expects a [`RunOptions`] proto. The options
  allow controlling the behavior of this particular step (e.g. turning tracing
  on).

  The optional `run_metadata` argument expects a [`RunMetadata`] proto. When
  appropriate, the non-Tensor output of this step will be collected there. For
  example, when users turn on tracing in `options`, the profiled info will be
  collected into this argument and passed back.

  Args:
    fetches: A single graph element, a list of graph elements, or a dictionary
      whose values are graph elements or lists of graph elements (described
      above).
    feed_dict: A dictionary that maps graph elements to values (described
      above).
    options: A [`RunOptions`] protocol buffer
    run_metadata: A [`RunMetadata`] protocol buffer

  Returns:
    Either a single value if `fetches` is a single graph element, or
    a list of values if `fetches` is a list, or a dictionary with the
    same keys as `fetches` if that is a dictionary (described above).
    Order in which `fetches` operations are evaluated inside the call
    is undefined.

  Raises:
    RuntimeError: If this `Session` is in an invalid state (e.g. has been
      closed).
    TypeError: If `fetches` or `feed_dict` keys are of an inappropriate type.
    ValueError: If `fetches` or `feed_dict` keys are invalid or refer to a
      `Tensor` that doesn't exist.
  """
  options_ptr = tf_session.TF_NewBufferFromString(
      compat.as_bytes(options.SerializeToString())) if options else None
  run_metadata_ptr = tf_session.TF_NewBuffer() if run_metadata else None

  try:
    result = self._run(None, fetches, feed_dict, options_ptr,
                     run_metadata_ptr)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:958:


self = <tensorflow.python.client.session.Session object at 0x7f6dfd0362d0> handle = None fetches = [<tf.Tensor 'transform/inputs/inputs/Intscopy:0' shape=(None,) dtype=float32>, <tf.Tensor 'transform/inputs/inputs/F...s_copy:0' shape=(None,) dtype=string>, <tf.Tensor 'transform/inputs/inputs/Strings_copy:0' shape=(None,) dtype=string>] feed_dict = {<tf.Tensor 'transform/inputs/Ints:0' shape=(None,) dtype=float32>: [1], <tf.Tensor 'transform/inputs/F_Stringed_Ints:0' shape=(None,) dtype=string>: [1], <tf.Tensor 'transform/inputs/Strings:0' shape=(None,) dtype=string>: ['hello']} options = None, run_metadata = None

def _run(self, handle, fetches, feed_dict, options, run_metadata):
  """Perform either run or partial_run, depending the presence of `handle`."""

  def _feed_fn(feed, feed_val):
    for tensor_type, _, feed_fn, _ in _REGISTERED_EXPANSIONS:
      if isinstance(feed, tensor_type):
        return feed_fn(feed, feed_val)
    raise TypeError('Feed argument %r has invalid type %r' %
                    (feed, type(feed)))

  # Check session.
  if self._closed:
    raise RuntimeError('Attempted to use a closed Session.')
  if self.graph.version == 0:
    raise RuntimeError('The Session graph is empty.  Add operations to the '
                       'graph before calling run().')

  # Create request.
  feed_dict_tensor = {}
  feed_map = {}

  # Validate and process feed_dict.
  feed_handles = {}
  if feed_dict:
    feed_dict = nest.flatten_dict_items(feed_dict)
    for feed, feed_val in feed_dict.items():
      for subfeed, subfeed_val in _feed_fn(feed, feed_val):
        try:
          subfeed_t = self.graph.as_graph_element(
              subfeed, allow_tensor=True, allow_operation=False)
        except Exception as e:
          raise TypeError('Cannot interpret feed_dict key as Tensor: ' +
                          e.args[0])

        if isinstance(subfeed_val, ops.Tensor):
          raise TypeError('The value of a feed cannot be a tf.Tensor object. '
                          'Acceptable feed values include Python scalars, '
                          'strings, lists, numpy ndarrays, or TensorHandles. '
                          'For reference, the tensor object was ' +
                          str(feed_val) + ' which was passed to the '
                          'feed with key ' + str(feed) + '.')

        subfeed_dtype = subfeed_t.dtype.as_numpy_dtype
        if isinstance(subfeed_val, int) and _convert_to_numpy_obj(
            subfeed_dtype, subfeed_val) != subfeed_val:
          raise TypeError(
              'Type of feed value ' + str(subfeed_val) + ' with type ' +
              str(type(subfeed_val)) +
              ' is not compatible with Tensor type ' + str(subfeed_dtype) +
              '. Try explicitly setting the type of the feed tensor'
              ' to a larger type (e.g. int64).')

        is_tensor_handle_feed = isinstance(subfeed_val,
                                           session_ops.TensorHandle)
        if is_tensor_handle_feed:
          np_val = subfeed_val.to_numpy_array()
          feed_handles[subfeed_t.ref()] = subfeed_val
        else:
          np_val = np.asarray(subfeed_val, dtype=subfeed_dtype)

        if (not is_tensor_handle_feed and
            not subfeed_t.get_shape().is_compatible_with(np_val.shape)):
          raise ValueError(
              'Cannot feed value of shape %r for Tensor %r, '
              'which has shape %r' %
              (np_val.shape, subfeed_t.name, str(subfeed_t.get_shape())))
        if not self.graph.is_feedable(subfeed_t):
          raise ValueError('Tensor %s may not be fed.' % subfeed_t)

        feed_dict_tensor[subfeed_t.ref()] = np_val
        feed_map[compat.as_bytes(subfeed_t.name)] = (subfeed_t, subfeed_val)

  # Create a fetch handler to take care of the structure of fetches.
  fetch_handler = _FetchHandler(
      self._graph, fetches, feed_dict_tensor, feed_handles=feed_handles)

  # Run request and get response.
  # We need to keep the returned movers alive for the following _do_run().
  # These movers are no longer needed when _do_run() completes, and
  # are deleted when `movers` goes out of scope when this _run() ends.
  # TODO(yuanbyu, keveman): Revisit whether we should just treat feeding
  # of a handle from a different device as an error.
  _ = self._update_with_movers(feed_dict_tensor, feed_map)
  final_fetches = fetch_handler.fetches()
  final_targets = fetch_handler.targets()
  # We only want to really perform the run if fetches or targets are provided,
  # or if the call is a partial run that specifies feeds.
  if final_fetches or final_targets or (handle and feed_dict_tensor):
    results = self._do_run(handle, final_targets, final_fetches,
                         feed_dict_tensor, options, run_metadata)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1181:


self = <tensorflow.python.client.session.Session object at 0x7f6dfd0362d0> handle = None, target_list = [] fetch_list = [<tf.Tensor 'transform/inputs/inputs/Intscopy:0' shape=(None,) dtype=float32>, <tf.Tensor 'transform/inputs/inputs/F...s_copy:0' shape=(None,) dtype=string>, <tf.Tensor 'transform/inputs/inputs/Strings_copy:0' shape=(None,) dtype=string>] feed_dict = {<Reference wrapping <tf.Tensor 'transform/inputs/Ints:0' shape=(None,) dtype=float32>>: array([1.], dtype=float32), <...eference wrapping <tf.Tensor 'transform/inputs/Strings:0' shape=(None,) dtype=string>>: array(['hello'], dtype=object)} options = None, run_metadata = None

def _do_run(self, handle, target_list, fetch_list, feed_dict, options,
            run_metadata):
  """Runs a step based on the given fetches and feeds.

  Args:
    handle: a handle for partial_run. None if this is just a call to run().
    target_list: A list of operations to be run, but not fetched.
    fetch_list: A list of tensors to be fetched.
    feed_dict: A dictionary that maps tensors to numpy ndarrays.
    options: A (pointer to a) [`RunOptions`] protocol buffer, or None
    run_metadata: A (pointer to a) [`RunMetadata`] protocol buffer, or None

  Returns:
    A list of numpy ndarrays, corresponding to the elements of
    `fetch_list`.  If the ith element of `fetch_list` contains the
    name of an operation, the first Tensor output of that operation
    will be returned for that element.

  Raises:
    tf.errors.OpError: Or one of its subclasses on error.
  """
  # pylint: disable=protected-access
  feeds = dict((t.deref()._as_tf_output(), v) for t, v in feed_dict.items())
  fetches = [t._as_tf_output() for t in fetch_list]
  targets = [op._c_op for op in target_list]

  # pylint: enable=protected-access

  def _run_fn(feed_dict, fetch_list, target_list, options, run_metadata):
    # Ensure any changes to the graph are reflected in the runtime.
    self._extend_graph()
    return self._call_tf_sessionrun(options, feed_dict, fetch_list,
                                    target_list, run_metadata)

  def _prun_fn(handle, feed_dict, fetch_list):
    if target_list:
      raise RuntimeError('partial_run() requires empty target_list.')
    return self._call_tf_sessionprun(handle, feed_dict, fetch_list)

  if handle is None:
    return self._do_call(_run_fn, feeds, fetches, targets, options,
                       run_metadata)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1359:


self = <tensorflow.python.client.session.Session object at 0x7f6dfd0362d0> fn = <function BaseSession._do_run.._run_fn at 0x7f6dfcfc5050> args = ({<tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfd0369f0>: array([1.], dtype=float32), <tensorflow.p... object at 0x7f6dfd036ef0>, <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7f6dfcfbe0b0>], [], None, None) message = 'Unsupported object type int', m = None

def _do_call(self, fn, *args):
  try:
    return fn(*args)
  except errors.OpError as e:
    message = compat.as_text(e.message)
    m = BaseSession._NODEDEF_NAME_RE.search(message)
    node_def = None
    op = None
    if m is not None:
      node_name = m.group(3)
      try:
        op = self._graph.get_operation_by_name(node_name)
        node_def = op.node_def
      except KeyError:
        pass
    message = error_interpolation.interpolate(message, self._graph)
    if 'only supports NHWC tensor format' in message:
      message += ('\nA possible workaround: Try disabling Grappler optimizer'
                  '\nby modifying the config for creating the session eg.'
                  '\nsession_config.graph_options.rewrite_options.'
                  'disable_meta_optimizer = True')
  raise type(e)(node_def, op, message)

E tensorflow.python.framework.errors_impl.InternalError: Unsupported object type int

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1384: InternalError

During handling of the above exception, another exception occurred:

???

apache_beam/runners/common.py:961:


???

apache_beam/runners/common.py:726:


???

apache_beam/runners/common.py:807:


???

apache_beam/runners/common.py:1095:


self = <tensorflow_transform.beam.impl._RunMetaGraphDoFn object at 0x7f6dfd026110> batch = [{'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'}] saved_model_dir = '/tmp/tmp6_61mrmc/tftransform_tmp/80c7f3af53944ba083f8cc99bb67f9d5'

def process(self, batch, saved_model_dir):
  """Runs the given graph to realize the output `Tensor` or `SparseTensor`s.

  Runs the graph in a TF session for computing the output values of the
  `Tensor` or `SparseTensor`s, given an input row of data (input `Tensor` or
  `SparseTensor`s).

  Args:
    batch: the batch of elements being processed by the DoFn
    saved_model_dir: Directory containing saved model.

  Yields:
    A representation of output features as a dict mapping keys (logical column
    names) to values.
  """
  if self._graph_state is None:
    # If available, acquire will return a cached _GraphState, since calling
    # _make_graph_state is expensive.
    self._graph_state = self._shared_graph_state_handle.acquire(
        lambda: self._make_graph_state(saved_model_dir))

  # This should remain true throughout the lifetime of this DoFn, regardless
  # of whether or not self._graph_state was cached.
  assert self._graph_state.saved_model_dir == saved_model_dir
yield self._handle_batch(batch)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:422:


self = <tensorflow_transform.beam.impl._RunMetaGraphDoFn object at 0x7f6dfd026110> batch = [{'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'}]

def _handle_batch(self, batch):
  self._update_metrics(batch)
  # Remove passthrough keys from the input data to make sure preprocessing_fn
  # won't see them. Making a copy of batch because mutating PCollection
  # elements is not allowed.
  # No need to remove (and cannot remove) the passthrough columns if
  # tfxio is used:
  # 1) The TensorAdapter expects the RecordBatch to be of the same schema as
  # statically determined by the TFXIO implementation the yields the
  # TensorAdapter.
  # 2) It's not possible to leak passthrough columns through TensorAdapter
  # because they are not going to be converted to Tensors.
  passthrough_data = None
  if self._passthrough_keys and not self._use_tfxio:
    batch = [copy.copy(x) for x in batch]
    passthrough_data = {
        key: [instance.pop(key) for instance in batch
             ] for key in self._passthrough_keys
    }

  feed_list = self._make_feed_list(batch)
  try:
    outputs_list = self._graph_state.callable_get_outputs(*feed_list)
  except Exception as e:
    raise ValueError(
        """An error occured while trying to apply the transformation: "{}".
        Batch instances: {},
        Fetching the values for the following Tensor keys: {}.""".format(
          str(e), batch, self._graph_state.outputs_tensor_keys))

E ValueError: An error occured while trying to apply the transformation: "Unsupported object type int". E Batch instances: [{'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'}], E Fetching the values for the following Tensor keys: ['Ints', 'Stringed Ints', 'Strings'].

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:367: ValueError

During handling of the above exception, another exception occurred:

def test_tutorial_schema():

    raw_data = [
        {'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'},
        {'Ints': 2, 'Stringed Ints': 2, 'Strings': 'world'}
    ]

    cols = ['Ints', 'Stringed Ints', 'Strings']

    intended_types = [tf.float32, tf.string, tf.string]
    true_types = [tf.float32, bytes, bytes]

    raw_data_metadata = dataset_metadata.DatasetMetadata(
        dataset_schema.from_feature_spec({
            cols[0]: tf.io.FixedLenFeature([], intended_types[0]),
            cols[1]: tf.io.FixedLenFeature([], intended_types[1]),
            cols[2]: tf.io.FixedLenFeature([], intended_types[2]),
        }))

    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        transformed_dataset, transform_fn = (  # pylint: disable=unused-variable
            (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
              minimal_preprocessing_fn))

tests/unit/data_import/test_test.py:31:


../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py:567: in ror p.run().wait_until_finish() ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/pipeline.py:497: in run self._options).run(False) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/pipeline.py:510: in run return self.runner.run_pipeline(self, self._options) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py:130: in run_pipeline return runner.run_pipeline(pipeline, options) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:179: in run_pipeline pipeline.to_runner_api(default_environment=self._default_environment)) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:189: in run_via_runner_api return self.run_stages(stage_context, stages) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:335: in run_stages bundle_context_manager, ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:545: in _run_stage expected_timer_output) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1051: in process_bundle for result, split_result in executor.map(execute, part_inputs): ../miniconda/envs/tfx/lib/python3.7/concurrent/futures/_base.py:598: in result_iterator yield fs.pop().result() ../miniconda/envs/tfx/lib/python3.7/concurrent/futures/_base.py:435: in result return self.get_result() ../miniconda/envs/tfx/lib/python3.7/concurrent/futures/_base.py:384: in get_result raise self._exception ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py:44: in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1048: in execute part_map, expected_outputs, fired_timers, expected_output_timers) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:947: in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:349: in push response = self.worker.do_instruction(request) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:471: in do_instruction getattr(request, request_type), request.instruction_id) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:506: in process_bundle bundle_processor.process_bundle(instruction_id)) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py:972: in process_bundle element.data) ../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py:218: in process_encoded self.output(decoded_value) apache_beam/runners/worker/operations.py:330: in apache_beam.runners.worker.operations.Operation.output ??? apache_beam/runners/worker/operations.py:332: in apache_beam.runners.worker.operations.Operation.output ??? apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive ??? apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:1030: in apache_beam.runners.common.DoFnRunner._reraise_augmented ??? apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:553: in apache_beam.runners.common.SimpleInvoker.invoke_process ??? apache_beam/runners/common.py:1122: in apache_beam.runners.common._OutputProcessor.process_outputs ??? apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive ??? apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:1030: in apache_beam.runners.common.DoFnRunner._reraise_augmented ??? apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:553: in apache_beam.runners.common.SimpleInvoker.invoke_process ??? apache_beam/runners/common.py:1122: in apache_beam.runners.common._OutputProcessor.process_outputs ??? apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive ??? apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:1030: in apache_beam.runners.common.DoFnRunner._reraise_augmented ??? apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:553: in apache_beam.runners.common.SimpleInvoker.invoke_process ??? apache_beam/runners/common.py:1122: in apache_beam.runners.common._OutputProcessor.process_outputs ??? apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive ??? apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process ??? apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:1045: in apache_beam.runners.common.DoFnRunner._reraise_augmented ??? ../miniconda/envs/tfx/lib/python3.7/site-packages/future/utils/init.py:446: in raise_with_traceback raise exc.with_traceback(traceback) apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process ??? apache_beam/runners/common.py:726: in apache_beam.runners.common.PerWindowInvoker.invoke_process ??? apache_beam/runners/common.py:807: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window ??? apache_beam/runners/common.py:1095: in apache_beam.runners.common._OutputProcessor.process_outputs ??? ../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:422: in process yield self._handle_batch(batch)


self = <tensorflow_transform.beam.impl._RunMetaGraphDoFn object at 0x7f6dfd026110> batch = [{'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'}]

def _handle_batch(self, batch):
  self._update_metrics(batch)
  # Remove passthrough keys from the input data to make sure preprocessing_fn
  # won't see them. Making a copy of batch because mutating PCollection
  # elements is not allowed.
  # No need to remove (and cannot remove) the passthrough columns if
  # tfxio is used:
  # 1) The TensorAdapter expects the RecordBatch to be of the same schema as
  # statically determined by the TFXIO implementation the yields the
  # TensorAdapter.
  # 2) It's not possible to leak passthrough columns through TensorAdapter
  # because they are not going to be converted to Tensors.
  passthrough_data = None
  if self._passthrough_keys and not self._use_tfxio:
    batch = [copy.copy(x) for x in batch]
    passthrough_data = {
        key: [instance.pop(key) for instance in batch
             ] for key in self._passthrough_keys
    }

  feed_list = self._make_feed_list(batch)
  try:
    outputs_list = self._graph_state.callable_get_outputs(*feed_list)
  except Exception as e:
    raise ValueError(
        """An error occured while trying to apply the transformation: "{}".
        Batch instances: {},
        Fetching the values for the following Tensor keys: {}.""".format(
          str(e), batch, self._graph_state.outputs_tensor_keys))

E ValueError: An error occured while trying to apply the transformation: "Unsupported object type int". E Batch instances: [{'Ints': 1, 'Stringed Ints': 1, 'Strings': 'hello'}], E Fetching the values for the following Tensor keys: ['Ints', 'Stringed Ints', 'Strings']. [while running 'AnalyzeAndTransformDataset/TransformDataset/Transform']

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:367: ValueError ------------------- Captured stderr call ------------------- WARNING:tensorflow:From /home/vagrant/tf-template/tests/unit/data_import/test_test.py:25: from_feature_spec (from tensorflow_transform.tf_metadata.dataset_schema) is deprecated and will be removed in a future version. Instructions for updating: from_feature_spec is a deprecated, use schema_utils.schema_from_feature_spec WARNING:tensorflow:Tensorflow version (2.3.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. WARNING:tensorflow:Tensorflow version (2.3.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 2020-08-19 06:37:54.207538: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory 2020-08-19 06:37:54.207574: W tensorflow/stream_executor/cuda/cuda_driver.cc:312] failed call to cuInit: UNKNOWN ERROR (303) 2020-08-19 06:37:54.207597: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (vagrant): /proc/driver/nvidia/version does not exist 2020-08-19 06:37:54.207780: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN)to use the following CPU instructions in performance-critical operations: AVX2 To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags. 2020-08-19 06:37:54.212731: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2592000000 Hz 2020-08-19 06:37:54.212936: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x5562962cd0e0 initialized for platform Host (this does not guarantee that XLA will be used). Devices: 2020-08-19 06:37:54.212953: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version WARNING:tensorflow:From /home/vagrant/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version. Instructions for updating: This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info. WARNING:tensorflow:From /home/vagrant/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/tf_utils.py:220: Tensor.experimental_ref (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version. Instructions for updating: Use ref() instead. WARNING:tensorflow:Tensorflow version (2.3.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. ==================== 1 failed in 2.06s ====================

zoyahav commented 4 years ago

Your raw_data_metadata should describe your raw_data as it actually is, not how it's intended to be. Your options are:

  1. specify 'Stringed Ints' as tf.int64 as the raw_data type is integer. Then in the preprocessing_fn pass it to tf.strings.as_string(). The result of that would have dtype tf.string.
  2. Convert this feature to string in your ExampleGen

Option (1) is usually preferred, as this logic will be applied both at preprocessing for training, but also for inference, which will help in avoiding training/inference skew.

tordbb commented 4 years ago

Hi Zoyahav,

Thank you for reaching out, this is highly apreciated!

Below is my failing attempt at creating such a preprocessing_fn, but first - a higher-level question - Am I approaching this problem in the wrong way?

My main goal is to change how a column should be interpreted. Two possible scenarios:

What is the best way to do this using tensorflow, so that the data is formatted correctly in exampleGen, schemaGen, Trainer etc?

If you think the best way involves changing types using tft_beam.AnalyzeAndTransformDataset(preprocessing_fn), then I would very much apreciate any thoughts on why this new piece of code fails:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
import tensorflow as tf
import tempfile
import tensorflow_transform.beam as tft_beam
import pdb
import warnings

def test_tutorial_schema():

    cols = ["Ints", "Ints to Strings", "Strings to Ints", "Strings"]
    raw_data = [
        {cols[0]: 1, cols[1]: 1, cols[2]: "3", cols[3]: "hello"},
        {cols[0]: 2, cols[1]: 2, cols[2]: "4", cols[3]: "world"},
    ]

    original_types = [tf.float32, tf.float32, tf.string, tf.string]
    intended_types = [tf.float32, tf.string, tf.float32, tf.string]

    # Todo: generate these automatically based on intended_types
    true_types = [tf.float32, bytes, tf.float32, bytes]
    wrong_types = [bytes, tf.float32, bytes, tf.float32]

    # Todo: make this independendt of length of cols and original_types. For loop?
    raw_data_metadata = dataset_metadata.DatasetMetadata(
        dataset_schema.from_feature_spec(
            {
                cols[0]: tf.io.FixedLenFeature([], original_types[0]),
                cols[1]: tf.io.FixedLenFeature([], original_types[1]),
                cols[2]: tf.io.FixedLenFeature([], original_types[2]),
                cols[3]: tf.io.FixedLenFeature([], original_types[3]),
            }
        )
    )

    # Generate a preprocessing fn having intended_types internalized
    preprocessing_fn = generate_preprocessing_fn(intended_types)

    # Run the preprocessing_fn which
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        transformed_dataset, transform_fn = (
            raw_data,
            raw_data_metadata,
        ) | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)

    transformed_data, transformed_metadata = transformed_dataset

    for col, typ, wrong in zip(cols, true_types, wrong_types):
        # import pdb
        # pdb.set_trace()
        warnings.warn(
            f"\n\n: : {transformed_data[0][col]}: {type(transformed_data[0][col])}"
        )
        assert type(transformed_data[0][col]) == typ
        assert type(transformed_data[1][col]) == typ
        assert not type(transformed_data[0][col]) == wrong
        assert not type(transformed_data[1][col]) == wrong

def generate_preprocessing_fn(intended_types):
    """Return function which can alter column types."""

    def preprocessing_fn(inputs):
        """Preprocess input columns into transformed columns."""
        for num, col_name in enumerate(inputs):
            current_type = inputs[col_name].dtype
            if not current_type == intended_types[num]:
                warnings.warn(
                    f"\n\n: : num: {num}, "
                    f"\n: : col_name: {col_name}"
                    f"\n: : current_type: {current_type},"
                    f"\n: : intended_types[num]: {intended_types[num]}"
                    f"\n: : inputs[col_name][0]: {inputs[col_name][0]}"
                )
                if intended_types[num] == tf.string:
                    inputs[col_name] = tf.strings.as_string(inputs[col_name])
                else:
                    # pdb.set_trace()
                    inputs[col_name] = tf.strings.to_number(
                        inputs[col_name], intended_types[num]
                    )

        return inputs

    return preprocessing_fn

Giving the following error:

self = <tensorflow.python.client.session.Session object at 0x7fb030202290>
handle = None
fetches = [<tf.Tensor 'transform/inputs/inputs/Ints_copy:0' shape=(None,) dtype=float32>, <tf.Tensor 'transform/AsString:0' shap...hape=(None,) dtype=float32>, <tf.Tensor 'transform/inputs/inputs/F_Strings_to_Ints_copy:0' shape=(None,) dtype=string>]
feed_dict = {<tf.Tensor 'transform/inputs/Ints:0' shape=(None,) dtype=float32>: [1], <tf.Tensor 'transform/inputs/F_Ints_to_String...(None,) dtype=string>: ['hello'], <tf.Tensor 'transform/inputs/F_Strings_to_Ints:0' shape=(None,) dtype=string>: ['3']}
options = None, run_metadata = None

    def _run(self, handle, fetches, feed_dict, options, run_metadata):
      """Perform either run or partial_run, depending the presence of `handle`."""

      def _feed_fn(feed, feed_val):
        for tensor_type, _, feed_fn, _ in _REGISTERED_EXPANSIONS:
          if isinstance(feed, tensor_type):
            return feed_fn(feed, feed_val)
        raise TypeError('Feed argument %r has invalid type %r' %
                        (feed, type(feed)))

      # Check session.
      if self._closed:
        raise RuntimeError('Attempted to use a closed Session.')
      if self.graph.version == 0:
        raise RuntimeError('The Session graph is empty.  Add operations to the '
                           'graph before calling run().')

      # Create request.
      feed_dict_tensor = {}
      feed_map = {}

      # Validate and process feed_dict.
      feed_handles = {}
      if feed_dict:
        feed_dict = nest.flatten_dict_items(feed_dict)
        for feed, feed_val in feed_dict.items():
          for subfeed, subfeed_val in _feed_fn(feed, feed_val):
            try:
              subfeed_t = self.graph.as_graph_element(
                  subfeed, allow_tensor=True, allow_operation=False)
            except Exception as e:
              raise TypeError('Cannot interpret feed_dict key as Tensor: ' +
                              e.args[0])

            if isinstance(subfeed_val, ops.Tensor):
              raise TypeError('The value of a feed cannot be a tf.Tensor object. '
                              'Acceptable feed values include Python scalars, '
                              'strings, lists, numpy ndarrays, or TensorHandles. '
                              'For reference, the tensor object was ' +
                              str(feed_val) + ' which was passed to the '
                              'feed with key ' + str(feed) + '.')

            subfeed_dtype = subfeed_t.dtype.as_numpy_dtype
            if isinstance(subfeed_val, int) and _convert_to_numpy_obj(
                subfeed_dtype, subfeed_val) != subfeed_val:
              raise TypeError(
                  'Type of feed value ' + str(subfeed_val) + ' with type ' +
                  str(type(subfeed_val)) +
                  ' is not compatible with Tensor type ' + str(subfeed_dtype) +
                  '. Try explicitly setting the type of the feed tensor'
                  ' to a larger type (e.g. int64).')

            is_tensor_handle_feed = isinstance(subfeed_val,
                                               session_ops.TensorHandle)
            if is_tensor_handle_feed:
              np_val = subfeed_val.to_numpy_array()
              feed_handles[subfeed_t.ref()] = subfeed_val
            else:
              np_val = np.asarray(subfeed_val, dtype=subfeed_dtype)

            if (not is_tensor_handle_feed and
                not subfeed_t.get_shape().is_compatible_with(np_val.shape)):
              raise ValueError(
                  'Cannot feed value of shape %r for Tensor %r, '
                  'which has shape %r' %
                  (np_val.shape, subfeed_t.name, str(subfeed_t.get_shape())))
            if not self.graph.is_feedable(subfeed_t):
              raise ValueError('Tensor %s may not be fed.' % subfeed_t)

            feed_dict_tensor[subfeed_t.ref()] = np_val
            feed_map[compat.as_bytes(subfeed_t.name)] = (subfeed_t, subfeed_val)

      # Create a fetch handler to take care of the structure of fetches.
      fetch_handler = _FetchHandler(
          self._graph, fetches, feed_dict_tensor, feed_handles=feed_handles)

      # Run request and get response.
      # We need to keep the returned movers alive for the following _do_run().
      # These movers are no longer needed when _do_run() completes, and
      # are deleted when `movers` goes out of scope when this _run() ends.
      # TODO(yuanbyu, keveman): Revisit whether we should just treat feeding
      # of a handle from a different device as an error.
      _ = self._update_with_movers(feed_dict_tensor, feed_map)
      final_fetches = fetch_handler.fetches()
      final_targets = fetch_handler.targets()
      # We only want to really perform the run if fetches or targets are provided,
      # or if the call is a partial run that specifies feeds.
      if final_fetches or final_targets or (handle and feed_dict_tensor):
        results = self._do_run(handle, final_targets, final_fetches,
>                              feed_dict_tensor, options, run_metadata)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1181: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tensorflow.python.client.session.Session object at 0x7fb030202290>
handle = None, target_list = []
fetch_list = [<tf.Tensor 'transform/inputs/inputs/Ints_copy:0' shape=(None,) dtype=float32>, <tf.Tensor 'transform/AsString:0' shap...hape=(None,) dtype=float32>, <tf.Tensor 'transform/inputs/inputs/F_Strings_to_Ints_copy:0' shape=(None,) dtype=string>]
feed_dict = {<Reference wrapping <tf.Tensor 'transform/inputs/Ints:0' shape=(None,) dtype=float32>>: array([1.], dtype=float32), <...ce wrapping <tf.Tensor 'transform/inputs/F_Strings_to_Ints:0' shape=(None,) dtype=string>>: array(['3'], dtype=object)}
options = None, run_metadata = None

    def _do_run(self, handle, target_list, fetch_list, feed_dict, options,
                run_metadata):
      """Runs a step based on the given fetches and feeds.

      Args:
        handle: a handle for partial_run. None if this is just a call to run().
        target_list: A list of operations to be run, but not fetched.
        fetch_list: A list of tensors to be fetched.
        feed_dict: A dictionary that maps tensors to numpy ndarrays.
        options: A (pointer to a) [`RunOptions`] protocol buffer, or None
        run_metadata: A (pointer to a) [`RunMetadata`] protocol buffer, or None

      Returns:
        A list of numpy ndarrays, corresponding to the elements of
        `fetch_list`.  If the ith element of `fetch_list` contains the
        name of an operation, the first Tensor output of that operation
        will be returned for that element.

      Raises:
        tf.errors.OpError: Or one of its subclasses on error.
      """
      # pylint: disable=protected-access
      feeds = dict((t.deref()._as_tf_output(), v) for t, v in feed_dict.items())
      fetches = [t._as_tf_output() for t in fetch_list]
      targets = [op._c_op for op in target_list]

      # pylint: enable=protected-access

      def _run_fn(feed_dict, fetch_list, target_list, options, run_metadata):
        # Ensure any changes to the graph are reflected in the runtime.
        self._extend_graph()
        return self._call_tf_sessionrun(options, feed_dict, fetch_list,
                                        target_list, run_metadata)

      def _prun_fn(handle, feed_dict, fetch_list):
        if target_list:
          raise RuntimeError('partial_run() requires empty target_list.')
        return self._call_tf_sessionprun(handle, feed_dict, fetch_list)

      if handle is None:
        return self._do_call(_run_fn, feeds, fetches, targets, options,
>                            run_metadata)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1359: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tensorflow.python.client.session.Session object at 0x7fb030202290>
fn = <function BaseSession._do_run.<locals>._run_fn at 0x7fb0301ecb90>
args = ({<tensorflow.python._pywrap_tf_session.TF_Output object at 0x7fb0302029f0>: array([1.], dtype=float32), <tensorflow.p... object at 0x7fb03020c130>, <tensorflow.python._pywrap_tf_session.TF_Output object at 0x7fb030208370>], [], None, None)
message = 'StringToNumberOp could not correctly convert string: hello\n\t [[node transform/StringToNumber (defined at /site-packages/tensorflow_transform/saved/saved_transform_io.py:274) ]]'
m = <re.Match object; span=(61, 96), match='[[{{node transform/StringToNumber}}'>

    def _do_call(self, fn, *args):
      try:
        return fn(*args)
      except errors.OpError as e:
        message = compat.as_text(e.message)
        m = BaseSession._NODEDEF_NAME_RE.search(message)
        node_def = None
        op = None
        if m is not None:
          node_name = m.group(3)
          try:
            op = self._graph.get_operation_by_name(node_name)
            node_def = op.node_def
          except KeyError:
            pass
        message = error_interpolation.interpolate(message, self._graph)
        if 'only supports NHWC tensor format' in message:
          message += ('\nA possible workaround: Try disabling Grappler optimizer'
                      '\nby modifying the config for creating the session eg.'
                      '\nsession_config.graph_options.rewrite_options.'
                      'disable_meta_optimizer = True')
>       raise type(e)(node_def, op, message)
E       tensorflow.python.framework.errors_impl.InvalidArgumentError: StringToNumberOp could not correctly convert string: hello
E                [[node transform/StringToNumber (defined at /site-packages/tensorflow_transform/saved/saved_transform_io.py:274) ]]
E       
E       Original stack trace for 'transform/StringToNumber':
E         File "/threading.py", line 890, in _bootstrap
E           self._bootstrap_inner()
E         File "/threading.py", line 926, in _bootstrap_inner
E           self.run()
E         File "/site-packages/apache_beam/utils/thread_pool_executor.py", line 70, in run
E           self._work_item.run()
E         File "/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run
E           self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1048, in execute
E           part_map, expected_outputs, fired_timers, expected_output_timers)
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 947, in process_bundle
E           result_future = self._worker_handler.control_conn.push(process_bundle_req)
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 349, in push
E           response = self.worker.do_instruction(request)
E         File "/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
E           getattr(request, request_type), request.instruction_id)
E         File "/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
E           bundle_processor.process_bundle(instruction_id))
E         File "/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle
E           element.data)
E         File "/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
E           self.output(decoded_value)
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 416, in process
E           lambda: self._make_graph_state(saved_model_dir))
E         File "/site-packages/tfx_bsl/beam/shared.py", line 221, in acquire
E           return _shared_map.acquire(self._key, constructor_fn)
E         File "/site-packages/tfx_bsl/beam/shared.py", line 184, in acquire
E           result = control_block.acquire(constructor_fn)
E         File "/site-packages/tfx_bsl/beam/shared.py", line 87, in acquire
E           result = constructor_fn()
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 416, in <lambda>
E           lambda: self._make_graph_state(saved_model_dir))
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 385, in _make_graph_state
E           self._exclude_outputs, self._tf_config)
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 209, in __init__
E           saved_model_dir, {}))
E         File "/site-packages/tensorflow_transform/saved/saved_transform_io.py", line 429, in partially_apply_saved_transform_internal
E           saved_model_dir, logical_input_map, tensor_replacement_map)
E         File "/site-packages/tensorflow_transform/saved/saved_transform_io.py", line 274, in _partially_apply_saved_transform_impl
E           input_map=input_map)
E         File "/site-packages/tensorflow/python/training/saver.py", line 1462, in import_meta_graph
E           **kwargs)[0]
E         File "/site-packages/tensorflow/python/training/saver.py", line 1486, in _import_meta_graph_with_return_elements
E           **kwargs))
E         File "/site-packages/tensorflow/python/framework/meta_graph.py", line 799, in import_scoped_meta_graph_with_return_elements
E           return_elements=return_elements)
E         File "/site-packages/tensorflow/python/util/deprecation.py", line 507, in new_func
E           return func(*args, **kwargs)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 405, in import_graph_def
E           producer_op_list=producer_op_list)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 513, in _import_graph_def_internal
E           _ProcessNewOps(graph)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 243, in _ProcessNewOps
E           for new_op in graph._add_new_tf_operations(compute_devices=False):  # pylint: disable=protected-access
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3624, in _add_new_tf_operations
E           for c_op in c_api_util.new_tf_operations(self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3624, in <listcomp>
E           for c_op in c_api_util.new_tf_operations(self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3510, in _create_op_from_tf_operation
E           ret = Operation(c_op, self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 1949, in __init__
E           self._traceback = tf_stack.extract_stack()

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/client/session.py:1384: InvalidArgumentError

During handling of the above exception, another exception occurred:

>   ???

apache_beam/runners/common.py:961: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

apache_beam/runners/common.py:726: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

apache_beam/runners/common.py:807: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

apache_beam/runners/common.py:1095: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tensorflow_transform.beam.impl._RunMetaGraphDoFn object at 0x7fb0301edbd0>
batch = [{'Ints': 1, 'Ints to Strings': 1, 'Strings': 'hello', 'Strings to Ints': '3'}]
saved_model_dir = '/tmp/tmpk509uook/tftransform_tmp/e744b1b491a04251b1ad4e5e458b4d3c'

    def process(self, batch, saved_model_dir):
      """Runs the given graph to realize the output `Tensor` or `SparseTensor`s.

      Runs the graph in a TF session for computing the output values of the
      `Tensor` or `SparseTensor`s, given an input row of data (input `Tensor` or
      `SparseTensor`s).

      Args:
        batch: the batch of elements being processed by the DoFn
        saved_model_dir: Directory containing saved model.

      Yields:
        A representation of output features as a dict mapping keys (logical column
        names) to values.
      """
      if self._graph_state is None:
        # If available, acquire will return a cached _GraphState, since calling
        # _make_graph_state is expensive.
        self._graph_state = self._shared_graph_state_handle.acquire(
            lambda: self._make_graph_state(saved_model_dir))

      # This should remain true throughout the lifetime of this DoFn, regardless
      # of whether or not self._graph_state was cached.
      assert self._graph_state.saved_model_dir == saved_model_dir

>     yield self._handle_batch(batch)

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:422: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tensorflow_transform.beam.impl._RunMetaGraphDoFn object at 0x7fb0301edbd0>
batch = [{'Ints': 1, 'Ints to Strings': 1, 'Strings': 'hello', 'Strings to Ints': '3'}]

    def _handle_batch(self, batch):
      self._update_metrics(batch)
      # Remove passthrough keys from the input data to make sure preprocessing_fn
      # won't see them. Making a copy of batch because mutating PCollection
      # elements is not allowed.
      # No need to remove (and cannot remove) the passthrough columns if
      # tfxio is used:
      # 1) The TensorAdapter expects the RecordBatch to be of the same schema as
      # statically determined by the TFXIO implementation the yields the
      # TensorAdapter.
      # 2) It's not possible to leak passthrough columns through TensorAdapter
      # because they are not going to be converted to Tensors.
      passthrough_data = None
      if self._passthrough_keys and not self._use_tfxio:
        batch = [copy.copy(x) for x in batch]
        passthrough_data = {
            key: [instance.pop(key) for instance in batch
                 ] for key in self._passthrough_keys
        }

      feed_list = self._make_feed_list(batch)
      try:
        outputs_list = self._graph_state.callable_get_outputs(*feed_list)
      except Exception as e:
        raise ValueError(
            """An error occured while trying to apply the transformation: "{}".
            Batch instances: {},
            Fetching the values for the following Tensor keys: {}.""".format(
>               str(e), batch, self._graph_state.outputs_tensor_keys))
E       ValueError: An error occured while trying to apply the transformation: "StringToNumberOp could not correctly convert string: hello
E                [[node transform/StringToNumber (defined at /site-packages/tensorflow_transform/saved/saved_transform_io.py:274) ]]
E       
E       Original stack trace for 'transform/StringToNumber':
E         File "/threading.py", line 890, in _bootstrap
E           self._bootstrap_inner()
E         File "/threading.py", line 926, in _bootstrap_inner
E           self.run()
E         File "/site-packages/apache_beam/utils/thread_pool_executor.py", line 70, in run
E           self._work_item.run()
E         File "/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run
E           self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1048, in execute
E           part_map, expected_outputs, fired_timers, expected_output_timers)
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 947, in process_bundle
E           result_future = self._worker_handler.control_conn.push(process_bundle_req)
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 349, in push
E           response = self.worker.do_instruction(request)
E         File "/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
E           getattr(request, request_type), request.instruction_id)
E         File "/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
E           bundle_processor.process_bundle(instruction_id))
E         File "/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle
E           element.data)
E         File "/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
E           self.output(decoded_value)
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 416, in process
E           lambda: self._make_graph_state(saved_model_dir))
E         File "/site-packages/tfx_bsl/beam/shared.py", line 221, in acquire
E           return _shared_map.acquire(self._key, constructor_fn)
E         File "/site-packages/tfx_bsl/beam/shared.py", line 184, in acquire
E           result = control_block.acquire(constructor_fn)
E         File "/site-packages/tfx_bsl/beam/shared.py", line 87, in acquire
E           result = constructor_fn()
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 416, in <lambda>
E           lambda: self._make_graph_state(saved_model_dir))
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 385, in _make_graph_state
E           self._exclude_outputs, self._tf_config)
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 209, in __init__
E           saved_model_dir, {}))
E         File "/site-packages/tensorflow_transform/saved/saved_transform_io.py", line 429, in partially_apply_saved_transform_internal
E           saved_model_dir, logical_input_map, tensor_replacement_map)
E         File "/site-packages/tensorflow_transform/saved/saved_transform_io.py", line 274, in _partially_apply_saved_transform_impl
E           input_map=input_map)
E         File "/site-packages/tensorflow/python/training/saver.py", line 1462, in import_meta_graph
E           **kwargs)[0]
E         File "/site-packages/tensorflow/python/training/saver.py", line 1486, in _import_meta_graph_with_return_elements
E           **kwargs))
E         File "/site-packages/tensorflow/python/framework/meta_graph.py", line 799, in import_scoped_meta_graph_with_return_elements
E           return_elements=return_elements)
E         File "/site-packages/tensorflow/python/util/deprecation.py", line 507, in new_func
E           return func(*args, **kwargs)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 405, in import_graph_def
E           producer_op_list=producer_op_list)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 513, in _import_graph_def_internal
E           _ProcessNewOps(graph)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 243, in _ProcessNewOps
E           for new_op in graph._add_new_tf_operations(compute_devices=False):  # pylint: disable=protected-access
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3624, in _add_new_tf_operations
E           for c_op in c_api_util.new_tf_operations(self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3624, in <listcomp>
E           for c_op in c_api_util.new_tf_operations(self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3510, in _create_op_from_tf_operation
E           ret = Operation(c_op, self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 1949, in __init__
E           self._traceback = tf_stack.extract_stack()
E       ".
E                 Batch instances: [{'Ints': 1, 'Ints to Strings': 1, 'Strings to Ints': '3', 'Strings': 'hello'}],
E                 Fetching the values for the following Tensor keys: ['Ints', 'Ints to Strings', 'Strings', 'Strings to Ints'].

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:367: ValueError

During handling of the above exception, another exception occurred:

    def test_tutorial_schema():

        cols = ["Ints", "Ints to Strings", "Strings to Ints", "Strings"]
        raw_data = [
            {cols[0]: 1, cols[1]: 1, cols[2]: "3", cols[3]: "hello"},
            {cols[0]: 2, cols[1]: 2, cols[2]: "4", cols[3]: "world"},
        ]

        original_types = [tf.float32, tf.float32, tf.string, tf.string]
        intended_types = [tf.float32, tf.string, tf.float32, tf.string]

        # Todo: generate these automatically based on intended_types
        true_types = [tf.float32, bytes, tf.float32, bytes]
        wrong_types = [bytes, tf.float32, bytes, tf.float32]

        # Todo: make this independendt of length of cols and original_types. For loop?
        raw_data_metadata = dataset_metadata.DatasetMetadata(
            dataset_schema.from_feature_spec(
                {
                    cols[0]: tf.io.FixedLenFeature([], original_types[0]),
                    cols[1]: tf.io.FixedLenFeature([], original_types[1]),
                    cols[2]: tf.io.FixedLenFeature([], original_types[2]),
                    cols[3]: tf.io.FixedLenFeature([], original_types[3]),
                }
            )
        )

        # Generate a preprocessing fn having intended_types internalized
        preprocessing_fn = generate_preprocessing_fn(intended_types)

        # Run the preprocessing_fn which
        with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
            transformed_dataset, transform_fn = (
                raw_data,
                raw_data_metadata,
>           ) | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)

tests/unit/data_import/test_test.py:45: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py:567: in __ror__
    p.run().wait_until_finish()
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/pipeline.py:497: in run
    self._options).run(False)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/pipeline.py:510: in run
    return self.runner.run_pipeline(self, self._options)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py:130: in run_pipeline
    return runner.run_pipeline(pipeline, options)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:179: in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:189: in run_via_runner_api
    return self.run_stages(stage_context, stages)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:335: in run_stages
    bundle_context_manager,
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:545: in _run_stage
    expected_timer_output)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1051: in process_bundle
    for result, split_result in executor.map(execute, part_inputs):
../miniconda/envs/tfx/lib/python3.7/concurrent/futures/_base.py:598: in result_iterator
    yield fs.pop().result()
../miniconda/envs/tfx/lib/python3.7/concurrent/futures/_base.py:435: in result
    return self.__get_result()
../miniconda/envs/tfx/lib/python3.7/concurrent/futures/_base.py:384: in __get_result
    raise self._exception
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py:44: in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1048: in execute
    part_map, expected_outputs, fired_timers, expected_output_timers)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:947: in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:349: in push
    response = self.worker.do_instruction(request)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:471: in do_instruction
    getattr(request, request_type), request.instruction_id)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:506: in process_bundle
    bundle_processor.process_bundle(instruction_id))
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py:972: in process_bundle
    element.data)
../miniconda/envs/tfx/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py:218: in process_encoded
    self.output(decoded_value)
apache_beam/runners/worker/operations.py:330: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:332: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1030: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:553: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1122: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1030: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:553: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1122: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1030: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:553: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1122: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:195: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:670: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:671: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:963: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1045: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
../miniconda/envs/tfx/lib/python3.7/site-packages/future/utils/__init__.py:446: in raise_with_traceback
    raise exc.with_traceback(traceback)
apache_beam/runners/common.py:961: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:726: in apache_beam.runners.common.PerWindowInvoker.invoke_process
    ???
apache_beam/runners/common.py:807: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    ???
apache_beam/runners/common.py:1095: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:422: in process
    yield self._handle_batch(batch)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tensorflow_transform.beam.impl._RunMetaGraphDoFn object at 0x7fb0301edbd0>
batch = [{'Ints': 1, 'Ints to Strings': 1, 'Strings': 'hello', 'Strings to Ints': '3'}]

    def _handle_batch(self, batch):
      self._update_metrics(batch)
      # Remove passthrough keys from the input data to make sure preprocessing_fn
      # won't see them. Making a copy of batch because mutating PCollection
      # elements is not allowed.
      # No need to remove (and cannot remove) the passthrough columns if
      # tfxio is used:
      # 1) The TensorAdapter expects the RecordBatch to be of the same schema as
      # statically determined by the TFXIO implementation the yields the
      # TensorAdapter.
      # 2) It's not possible to leak passthrough columns through TensorAdapter
      # because they are not going to be converted to Tensors.
      passthrough_data = None
      if self._passthrough_keys and not self._use_tfxio:
        batch = [copy.copy(x) for x in batch]
        passthrough_data = {
            key: [instance.pop(key) for instance in batch
                 ] for key in self._passthrough_keys
        }

      feed_list = self._make_feed_list(batch)
      try:
        outputs_list = self._graph_state.callable_get_outputs(*feed_list)
      except Exception as e:
        raise ValueError(
            """An error occured while trying to apply the transformation: "{}".
            Batch instances: {},
            Fetching the values for the following Tensor keys: {}.""".format(
>               str(e), batch, self._graph_state.outputs_tensor_keys))
E       ValueError: An error occured while trying to apply the transformation: "StringToNumberOp could not correctly convert string: hello
E                [[node transform/StringToNumber (defined at /site-packages/tensorflow_transform/saved/saved_transform_io.py:274) ]]
E       
E       Original stack trace for 'transform/StringToNumber':
E         File "/threading.py", line 890, in _bootstrap
E           self._bootstrap_inner()
E         File "/threading.py", line 926, in _bootstrap_inner
E           self.run()
E         File "/site-packages/apache_beam/utils/thread_pool_executor.py", line 70, in run
E           self._work_item.run()
E         File "/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run
E           self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1048, in execute
E           part_map, expected_outputs, fired_timers, expected_output_timers)
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 947, in process_bundle
E           result_future = self._worker_handler.control_conn.push(process_bundle_req)
E         File "/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 349, in push
E           response = self.worker.do_instruction(request)
E         File "/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
E           getattr(request, request_type), request.instruction_id)
E         File "/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
E           bundle_processor.process_bundle(instruction_id))
E         File "/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle
E           element.data)
E         File "/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
E           self.output(decoded_value)
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 416, in process
E           lambda: self._make_graph_state(saved_model_dir))
E         File "/site-packages/tfx_bsl/beam/shared.py", line 221, in acquire
E           return _shared_map.acquire(self._key, constructor_fn)
E         File "/site-packages/tfx_bsl/beam/shared.py", line 184, in acquire
E           result = control_block.acquire(constructor_fn)
E         File "/site-packages/tfx_bsl/beam/shared.py", line 87, in acquire
E           result = constructor_fn()
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 416, in <lambda>
E           lambda: self._make_graph_state(saved_model_dir))
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 385, in _make_graph_state
E           self._exclude_outputs, self._tf_config)
E         File "/site-packages/tensorflow_transform/beam/impl.py", line 209, in __init__
E           saved_model_dir, {}))
E         File "/site-packages/tensorflow_transform/saved/saved_transform_io.py", line 429, in partially_apply_saved_transform_internal
E           saved_model_dir, logical_input_map, tensor_replacement_map)
E         File "/site-packages/tensorflow_transform/saved/saved_transform_io.py", line 274, in _partially_apply_saved_transform_impl
E           input_map=input_map)
E         File "/site-packages/tensorflow/python/training/saver.py", line 1462, in import_meta_graph
E           **kwargs)[0]
E         File "/site-packages/tensorflow/python/training/saver.py", line 1486, in _import_meta_graph_with_return_elements
E           **kwargs))
E         File "/site-packages/tensorflow/python/framework/meta_graph.py", line 799, in import_scoped_meta_graph_with_return_elements
E           return_elements=return_elements)
E         File "/site-packages/tensorflow/python/util/deprecation.py", line 507, in new_func
E           return func(*args, **kwargs)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 405, in import_graph_def
E           producer_op_list=producer_op_list)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 513, in _import_graph_def_internal
E           _ProcessNewOps(graph)
E         File "/site-packages/tensorflow/python/framework/importer.py", line 243, in _ProcessNewOps
E           for new_op in graph._add_new_tf_operations(compute_devices=False):  # pylint: disable=protected-access
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3624, in _add_new_tf_operations
E           for c_op in c_api_util.new_tf_operations(self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3624, in <listcomp>
E           for c_op in c_api_util.new_tf_operations(self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 3510, in _create_op_from_tf_operation
E           ret = Operation(c_op, self)
E         File "/site-packages/tensorflow/python/framework/ops.py", line 1949, in __init__
E           self._traceback = tf_stack.extract_stack()
E       ".
E                 Batch instances: [{'Ints': 1, 'Ints to Strings': 1, 'Strings to Ints': '3', 'Strings': 'hello'}],
E                 Fetching the values for the following Tensor keys: ['Ints', 'Ints to Strings', 'Strings', 'Strings to Ints']. [while running 'AnalyzeAndTransformDataset/TransformDataset/Transform']

../miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py:367: ValueError
----------------------- Captured stderr call -----------------------
WARNING:tensorflow:From /home/vagrant/tf-template/tests/unit/data_import/test_test.py:32: from_feature_spec (from tensorflow_transform.tf_metadata.dataset_schema) is deprecated and will be removed in a future version.
Instructions for updating:
from_feature_spec is a deprecated, use schema_utils.schema_from_feature_spec
WARNING:tensorflow:Tensorflow version (2.3.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.3.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
2020-08-19 17:25:45.899690: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2020-08-19 17:25:45.903087: W tensorflow/stream_executor/cuda/cuda_driver.cc:312] failed call to cuInit: UNKNOWN ERROR (303)
2020-08-19 17:25:45.903116: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (vagrant): /proc/driver/nvidia/version does not exist
2020-08-19 17:25:45.903383: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN)to use the following CPU instructions in performance-critical operations:  AVX2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2020-08-19 17:25:45.929021: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2592000000 Hz
2020-08-19 17:25:45.929327: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55853e2cd3c0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-08-19 17:25:45.929347: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
WARNING:tensorflow:From /home/vagrant/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
WARNING:tensorflow:From /home/vagrant/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/tf_utils.py:220: Tensor.experimental_ref (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use ref() instead.
WARNING:tensorflow:Tensorflow version (2.3.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
========================= warnings summary =========================
tests/unit/data_import/test_test.py::test_tutorial_schema
  /home/vagrant/tf-template/tests/unit/data_import/test_test.py:70: UserWarning: 

  : : num: 1, 
  : : col_name: Ints to Strings
  : : current_type: <dtype: 'float32'>,
  : : intended_types[num]: <dtype: 'string'>
  : : inputs[col_name][0]: Tensor("strided_slice:0", shape=(), dtype=float32)
    f"\n\n: : num: {num}, "

tests/unit/data_import/test_test.py::test_tutorial_schema
  /home/vagrant/tf-template/tests/unit/data_import/test_test.py:70: UserWarning: 

  : : num: 2, 
  : : col_name: Strings
  : : current_type: <dtype: 'string'>,
  : : intended_types[num]: <dtype: 'float32'>
  : : inputs[col_name][0]: Tensor("strided_slice_1:0", shape=(), dtype=string)
    f"\n\n: : num: {num}, "

-- Docs: https://docs.pytest.org/en/latest/warnings.html
zoyahav commented 4 years ago

The very long error above boils down to: InvalidArgumentError: StringToNumberOp could not correctly convert string: hello And we can also see the following snippet that helps us understand the issue: E Batch instances: [{'Ints': 1, 'Ints to Strings': 1, 'Strings to Ints': '3', 'Strings': 'hello'}], E Fetching the values for the following Tensor keys: ['Ints', 'Ints to Strings', 'Strings', 'Strings to Ints']. [while running 'AnalyzeAndTransformDataset/TransformDataset/Transform']

Strings has the value "hello" which can't be converted to a number. The issue with your code is that you're relying on the ordering of dictionary keys to be consistent with a hardcoded list:

        for num, col_name in enumerate(inputs):
            current_type = inputs[col_name].dtype
            if not current_type == intended_types[num]:

It can be fixed by comparing the dtypes correctly:

def generate_preprocessing_fn(intended_types, cols):
    """Return function which can alter column types."""

    def preprocessing_fn(inputs):
        """Preprocess input columns into transformed columns."""
        for num, col_name in enumerate(inputs):
            col_index = cols.index(col_name)
            ...
            if not current_type == intended_types[col_index]:
            ...

Regarding:

What is the best way to do this using tensorflow, so that the data is formatted correctly in exampleGen, schemaGen, Trainer etc?

Transform (AnalyzeAndTransformDataset) is a good way to convert your data because this conversion will happen for training, but also inference and analysis. Converting your data with Transform will only apply to components that are downstream from Transform (Trainer, Pusher, ModelValidator, etc.). This conversion wouldn't apply for exampleGen, schemaGen because those happen before Transform.

tordbb commented 4 years ago

Hi Zoyahav,

Again, thank you so much for helping me out!

Your fixes helped make the test I had provided work smoothly. And your guidance is very higly apreciated!

Since then, I have tried to alter my test so that it consumes example_gen objects instead of dictionaries containing lists. After all, my end goal is a reusable pipeline based on TFX (components like CsvExampleGen) with the ability to change types of each column as I wish.

I have not been able to do this using AnalyzeAndTransformDataset, however, tfx.Transform has brought me close to the goal. The attached notebook shows the code and where it crashes. Necessary data and files are attached but also visible in the notebook.

Could you give me a hint on how to make this work, and whether I am headed in the right direction in my choice of tfx.Transform?

transform_column_format.zip

Best regards, Tord

tordbb commented 4 years ago

In case it is useful to other readers (and search engines), below is the notebook attached above, just in .md format. Below that follows the error message.



Interactive pipeline to convert column types

This notebook uses tensorflow extended components to change column types of a data file.

Import third party packages

import os
import pprint
from tfx.components import CsvExampleGen
from tfx.utils.dsl_utils import external_input
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Transform
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
import tensorflow as tf
from pathlib import Path
from tf_template.utils import paths
context = InteractiveContext()
# Set up pretty print
pp = pprint.PrettyPrinter()
# Path to data file
root_path = paths.get_root_path()

paths.get_root_path() does the following, which does not seem to work in notebooks:

def get_root_path():
    return Path(__file__).absolute().parents[2]
raw_path = root_path / "data" / "raw"
csv_file_path = raw_path / "fruit_list" / "fruit_list.csv"

The data file behind "csv_file_path" has the following contents:

Quantity,Cost,Fruit,Weight
1,100,Banana,2.4
1,150,Banana,3.5
0,300,Kiwi,20.23
4,200,Apple,1.4
0,100,Antiapple,-1.4
5,500,Pinapple,255

# Path to transform component
transformation_path = str(root_path / "tf_template" / "data_transform" / "change_types.py")

The file behind "transformation_path" has the following contents:


import tensorflow as tf
import warnings

def preprocessing_fn(inputs):

    column_names = ["Quantity", "Cost", "Fruit", "Weight"]
    original_types = [tf.float32, tf.float32, tf.string, tf.float32]
    intended_types = [tf.float32, tf.string, tf.string, tf.string]

    outputs = {}

    for col_name in inputs:
        current_type = inputs[col_name].dtype
        col_index = column_names.index(col_name)
        current_intended_type = intended_types[col_index]
        if not current_type == current_intended_type:
            warnings.warn(f"col_name: {col_name}")
            warnings.warn(f"inputs[col_name]: {inputs[col_name]}")
            warnings.warn(f"current_type: {current_type}")
            warnings.warn(f"current_intended_type: {current_intended_type}")
            input_col = tf.sparse.to_dense(inputs[col_name])
            if current_intended_type == tf.string:
                outputs[col_name] = tf.strings.as_string(input_col)
            else:
                outputs[col_name] = tf.strings.to_number(
                    input_col, current_intended_type
                )

    return outputs
# Specify original and intended column types
column_names = ["Quantity", "Cost", "Fruit", "Weight"]
original_types = [tf.float32, tf.float32, tf.string, tf.float32]
intended_types = [tf.float32, tf.string, tf.string, tf.string]
# Load data file using CsvExampleGen. 
csv_files_directory = csv_file_path.parent
examples = external_input(csv_files_directory)
example_gen = CsvExampleGen(input=examples)
context.run(example_gen)

Print contents of the first 3 (or less) rows in the training split of example_gen

# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)
# Generate StatisticsGen
statistics_gen = StatisticsGen(examples=example_gen.outputs["examples"])
context.run(statistics_gen)
# Generate SchemaGen
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs["statistics"], infer_feature_shape=False
)
context.run(schema_gen)
# Adjust formats using Transform
transform = Transform(
    examples=example_gen.outputs["examples"],
    schema=schema_gen.outputs["schema"],
    module_file=transformation_path,
)
context.run(transform)
column_names = ["Quantity", "Cost", "Fruit", "Weight"]
original_types = [tf.float32, tf.float32, tf.string, tf.float32]
intended_types = [tf.float32, tf.string, tf.string, tf.string]

change_types_path = str(paths.get_root_path() / "tf_template" / "data_transform" / "change_types.py")

transform = Transform(
    examples=example_gen.outputs["examples"],
    schema=schema_gen.outputs["schema"],
    module_file=change_types_path)
context.run(transform)
transform.outputs

And the error message which results from the line context.run(transform) is as follows:


WARNING:tensorflow:From /home/vagrant/miniconda/envs/tfx/lib/python3.7/site-packages/tfx/components/transform/executor.py:511: Schema (from tensorflow_transform.tf_metadata.dataset_schema) is deprecated and will be removed in a future version.
Instructions for updating:
Schema is a deprecated, use schema_utils.schema_from_feature_spec to create a `Schema`
user_module:20: UserWarning: col_name: Cost
user_module:21: UserWarning: inputs[col_name]: SparseTensor(indices=Tensor("Cost/indices:0", shape=(None, 2), dtype=int64), values=Tensor("Cost/values:0", shape=(None,), dtype=int64), dense_shape=Tensor("Cost/shape:0", shape=(2,), dtype=int64))
user_module:22: UserWarning: current_type: <dtype: 'int64'>
user_module:23: UserWarning: current_intended_type: <dtype: 'string'>
user_module:20: UserWarning: col_name: Quantity
user_module:21: UserWarning: inputs[col_name]: SparseTensor(indices=Tensor("Quantity/indices:0", shape=(None, 2), dtype=int64), values=Tensor("Quantity/values:0", shape=(None,), dtype=int64), dense_shape=Tensor("Quantity/shape:0", shape=(2,), dtype=int64))
user_module:23: UserWarning: current_intended_type: <dtype: 'float32'>
--------------------------------------
ValueErrorTraceback (most recent call last)
~/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/framework/op_def_library.py in _apply_op_helper(op_type_name, name, **keywords)
    469               as_ref=input_arg.is_ref,
--> 470               preferred_dtype=default_dtype)
    471         except TypeError as err:

~/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/framework/ops.py in convert_to_tensor(value, dtype, name, as_ref, preferred_dtype, dtype_hint, ctx, accepted_result_types)
   1474           "Tensor conversion requested dtype %s for Tensor with dtype %s: %r" %
-> 1475           (dtype.name, value.dtype.name, value))
   1476     return value

ValueError: Tensor conversion requested dtype string for Tensor with dtype int64: <tf.Tensor 'SparseToDense_1:0' shape=(None, None) dtype=int64>

During handling of the above exception, another exception occurred:

TypeErrorTraceback (most recent call last)
<ipython-input-12-d5365d0f0017> in <module>
      5     module_file=transformation_path,
      6 )
----> 7 context.run(transform)

~/miniconda/envs/tfx/lib/python3.7/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
     64       # __IPYTHON__ variable is set by IPython, see
     65       # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 66       return fn(*args, **kwargs)
     67     else:
     68       absl.logging.warning(

~/miniconda/envs/tfx/lib/python3.7/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
    166         component, pipeline_info, driver_args, metadata_connection,
    167         beam_pipeline_args, additional_pipeline_args)
--> 168     execution_id = launcher.launch().execution_id
    169 
    170     return execution_result.ExecutionResult(

~/miniconda/envs/tfx/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
    203                          execution_decision.input_dict,
    204                          execution_decision.output_dict,
--> 205                          execution_decision.exec_properties)
    206 
    207     absl.logging.info('Running publisher for %s',

~/miniconda/envs/tfx/lib/python3.7/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
     65         executor_context)  # type: ignore
     66 
---> 67     executor.Do(input_dict, output_dict, exec_properties)

~/miniconda/envs/tfx/lib/python3.7/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
    389       label_outputs[labels.CACHE_OUTPUT_PATH_LABEL] = cache_output
    390     status_file = 'status_file'  # Unused
--> 391     self.Transform(label_inputs, label_outputs, status_file)
    392     absl.logging.debug('Cleaning up temp path %s on executor success',
    393                        temp_path)

~/miniconda/envs/tfx/lib/python3.7/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
    924     # order to fail faster if it fails.
    925     analyze_input_columns = tft.get_analyze_input_columns(
--> 926         preprocessing_fn, feature_spec_or_typespecs)
    927 
    928     if not compute_statistics and not materialize_output_paths:

~/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow_transform/inspect_preprocessing_fn.py in get_analyze_input_columns(preprocessing_fn, specs)
     56     input_signature = impl_helper.batched_placeholders_from_specs(
     57         specs)
---> 58     _ = preprocessing_fn(input_signature.copy())
     59 
     60     tensor_sinks = graph.get_collection(analyzer_nodes.TENSOR_REPLACEMENTS)

~/tf-template/tf_template/data_transform/change_types.py in preprocessing_fn(inputs)
     27             else:
     28                 outputs[col_name] = tf.strings.to_number(
---> 29                     input_col, current_intended_type
     30                 )
     31 

~/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py in wrapper(*args, **kwargs)
    199     """Call target, and fall back on dispatchers if there is a TypeError."""
    200     try:
--> 201       return target(*args, **kwargs)
    202     except (TypeError, ValueError):
    203       # Note: convert_to_eager_tensor currently raises a ValueError, not a

~/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/ops/string_ops.py in string_to_number(input, out_type, name)
    477     A `Tensor` of type `out_type`.
    478   """
--> 479   return gen_parsing_ops.string_to_number(input, out_type, name)
    480 
    481 

~/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/ops/gen_parsing_ops.py in string_to_number(string_tensor, out_type, name)
   2336   _, _, _op, _outputs = _op_def_library._apply_op_helper(
   2337         "StringToNumber", string_tensor=string_tensor, out_type=out_type,
-> 2338                           name=name)
   2339   _result = _outputs[:]
   2340   if _execute.must_record_gradient():

~/miniconda/envs/tfx/lib/python3.7/site-packages/tensorflow/python/framework/op_def_library.py in _apply_op_helper(op_type_name, name, **keywords)
    491           if input_arg.type != types_pb2.DT_INVALID:
    492             raise TypeError("%s expected type of %s." %
--> 493                             (prefix, dtypes.as_dtype(input_arg.type).name))
    494           else:
    495             # Update the maps with the default, if needed.

TypeError: Input 'string_tensor' of 'StringToNumber' Op has type int64 that does not match expected type of string.
zoyahav commented 4 years ago

Hi Tord, I'm not able to open/run colabs in zips, ideally I'd have a hosted colab I could look at. Regardless, this is still the same issue as before, tf.strings.to_number in your preprocessing_fn is being passed a Tensor with dtype int64 which is incompatible. This shouldn't be happening at all since it looks like your preprocessing_fn is only trying to convert from float32 to string:

original_types = [tf.float32, tf.float32, tf.string, tf.float32] intended_types = [tf.float32, tf.string, tf.string, tf.string]

I think that the issue here is that SchemaGen is inferring your float32 features as int64, and then since float32 != int64, you try to call tf.strings.to_number. You could either manually update your schema to accurately reflect your data, or update your preprocessing_fn to be consistent with the schema.

Regardless, they way the preprocessing_fn is written is a bit brittle for an automated pipeline such as TFX, I would at least change it to be something like:

def preprocessing_fn(inputs):
  output_dtypes = {
    "Quantity": tf.float32,
    "Cost": tf.string,
    "Fruit": tf.string,
    "Weight": tf.string,
  }
  outputs = {}

  for col_name, col_value in inputs.items():
    current_dtype = col_value.dtype
    output_dtype = output_dtypes.get(col_name)
    if current_dtype != output_dtype:
      if isinstance(col_value, tf.SparseTensor):
        col_value = tf.sparse.to_dense(col_value)
      if output_dtype == tf.string:
        outputs[col_name] = tf.strings.as_string(col_value)
      elif current_dtype == tf.string:
        outputs[col_name] = tf.strings.to_number(col_value, output_dtype)
      else:
        # Both input and output dtypes are numerical, or compatible with numerical.
        outputs[col_name] = tf.cast(col_value, output_dtype)
    else:
      # Either current_dtype == output_dtype, or col_name not in
      # output_dtypes - add it to outputs anyway.
      outputs[col_name] = col_value
  return outputs
tordbb commented 4 years ago

Once again, thank you so much!

Your neat version of preprocessing_fn has been adopted into my notebook. It has enabled me to move a few more steps, but now I am struggeling with an error message related to column(s) having invalid shape:

Feature Fruit (Tensor("SparseToDense:0", shape=(None, None), dtype=string)) had invalid shape (None, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size

PS: I have included warnings in my code in order to get more helpful debugging information along the way. PSPS: I see you have provided support to someone with a similar issue earlier, but none of the proposed fixes seem to work for my code.

The notebook is now uploaded and openly accessible at colab. It is simplified so that it generates all the data and module files it needs to run. I made a successful run with it by selecting "Connect to hosted runtime", and "Runtime" -> "Run All". https://colab.research.google.com/drive/1skLbDOCmf9ATmM1fB91nBQZvFIrKznqG?usp=sharing

Any help on this would be highly apreciated. Feel free to modify in my colab notebook - it should be setup for anyone to edit!

zoyahav commented 4 years ago

I can't seem to save changes in the colab, but I was able to fix the issue by replacing:

            col_values = tf.sparse.to_dense(col_values)

with

            col_values = tft.sparse_tensor_to_dense_with_shape(
                col_values,
                shape=(None, 1),
                default_value='' if current_dtype is tf.string else 0)

Unfortunately, tf.sparse.to_dense still drops the shape information sometimes, so we have this helper method in TFT to retain it. With this change Transform finished successfully.

tordbb commented 4 years ago

Thanks, the code now works! Is there already an active issue established to fix this in Tensorflow? My suggested fix would be that if

tf.sparse.to_dense(col_values)

gives the above error message, tensorflow could automatically call

            col_values = tft.sparse_tensor_to_dense_with_shape(
                col_values,
                shape=(None, 1),
                default_value='' if current_dtype is tf.string else 0)

rather than throwing the error message. Perhaps, they would still throw the message, but as a warning instead?

zoyahav commented 4 years ago

This is not possible due to the order of dependencies (TFT depends on TF, not the other way around). TF has a tracking issue for this and hopefully it will be fixed at some point, currently the workaround is to call tf.sparse.to_dense followed by calling set_shape() and the result with the expected shape (in your case - [None, 1]).

UsharaniPagadala commented 2 years ago

@tordbb Closing this issue as it is resolved,Please feel free to reopen if this still persists.Thanks