Building-ML-Pipelines / building-machine-learning-pipelines

Code repository for the O'Reilly publication "Building Machine Learning Pipelines" by Hannes Hapke & Catherine Nelson
MIT License
582 stars 250 forks source link

Transform code snippet for Computer Vision problem set in the book not working (or I couldn't make it work) #25

Open shabie opened 3 years ago

shabie commented 3 years ago

The book provides code snippets for the computer vision problem set but it seems to be not working for the transform. I mean specifcally the following code:

def process_image(raw_image):
    raw_image = tf.reshape(raw_image, [-1])
    img_rgb = tf.image.decode_jpeg(raw_image, channels=3)
    img_gray = tf.image.rgb_to_grayscale(img_rgb)
    img = tf.image.convert_image_dtype(img_gray, tf.float32)
    resized_img = tf.image.resize_with_pad(
        img,
        target_height=300,
        target_width=300,
    )
    img_grayscale = tf.image.rgb_to_grayscale(resized_img)
    return tf.reshape(img_grayscale, [-1, 300, 300, 1])

I am using it as follows in the preprocessing_fn:

def preprocessing_fn(inputs):
    image_raw = inputs['image_raw']
    label = inputs['label']
    label_integerized = tft.compute_and_apply_vocabulary(label)
    img_preprocessed = process_image(image_raw)  ## used here
    return {
      'img_preprocessed': img_preprocessed,
      'label_integerized': label_integerized,
    }

This is being called in the Transform step of the pipeline:

transform = tfx.components.Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath("module.py"),
)

context.run(transform)

The TFRecordDataset is a two-feature dataset one containing the raw (JPEG) image and other one contains the label as string (stored also as bytes). It was generated using pretty much the same code shown earlier in the book under the Data Ingestion chapter.

When I run the above, I get the following traceback:

---------------------------------------------------------------------------
InvalidArgumentError                      Traceback (most recent call last)
~/projects/datadrivers/venv/lib/python3.6/site-packages/tensorflow/python/framework/ops.py in _create_c_op(graph, node_def, inputs, control_inputs, op_def)
   1811   try:
-> 1812     c_op = pywrap_tf_session.TF_FinishOperation(op_desc)
   1813   except errors.InvalidArgumentError as e:

InvalidArgumentError: Shape must be rank 0 but is rank 1 for '{{node DecodeJpeg}} = DecodeJpeg[acceptable_fraction=1, channels=3, dct_method="", fancy_upscaling=true, ratio=1, try_recover_truncated=false](Reshape)' with input shapes: [?].

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-38-795928f0e78f> in <module>
      5 )
      6 
----> 7 context.run(transform)

~/projects/datadrivers/venv/lib/python3.6/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
     65       # __IPYTHON__ variable is set by IPython, see
     66       # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 67       return fn(*args, **kwargs)
     68     else:
     69       absl.logging.warning(

~/projects/datadrivers/venv/lib/python3.6/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
    175         telemetry_utils.LABEL_TFX_RUNNER: runner_label,
    176     }):
--> 177       execution_id = launcher.launch().execution_id
    178 
    179     return execution_result.ExecutionResult(

~/projects/datadrivers/venv/lib/python3.6/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
    203                          execution_decision.input_dict,
    204                          execution_decision.output_dict,
--> 205                          execution_decision.exec_properties)
    206 
    207     absl.logging.info('Running publisher for %s',

~/projects/datadrivers/venv/lib/python3.6/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
     65         executor_context)  # type: ignore
     66 
---> 67     executor.Do(input_dict, output_dict, exec_properties)

~/projects/datadrivers/venv/lib/python3.6/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
    388       label_outputs[labels.CACHE_OUTPUT_PATH_LABEL] = cache_output
    389     status_file = 'status_file'  # Unused
--> 390     self.Transform(label_inputs, label_outputs, status_file)
    391     absl.logging.debug('Cleaning up temp path %s on executor success',
    392                        temp_path)

~/projects/datadrivers/venv/lib/python3.6/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
    886     # order to fail faster if it fails.
    887     analyze_input_columns = tft.get_analyze_input_columns(
--> 888         preprocessing_fn, typespecs)
    889 
    890     if not compute_statistics and not materialize_output_paths:

~/projects/datadrivers/venv/lib/python3.6/site-packages/tensorflow_transform/inspect_preprocessing_fn.py in get_analyze_input_columns(preprocessing_fn, specs)
     56     input_signature = impl_helper.batched_placeholders_from_specs(
     57         specs)
---> 58     _ = preprocessing_fn(input_signature.copy())
     59 
     60     tensor_sinks = graph.get_collection(analyzer_nodes.TENSOR_REPLACEMENTS)

~/projects/datadrivers/module.py in preprocessing_fn(inputs)
     21     label = inputs['label']
     22     label_integerized = tft.compute_and_apply_vocabulary(label)
---> 23     img_preprocessed = process_image(image_raw)
     24     return {
     25       'img_preprocessed': img_preprocessed,

~/projects/datadrivers/module.py in process_image(raw_image)
      5 def process_image(raw_image):
      6     raw_image = tf.reshape(raw_image, [-1])
----> 7     img_rgb = tf.io.decode_jpeg(raw_image, channels=3)
      8     img_gray = tf.image.rgb_to_grayscale(img_rgb)
      9     img = tf.image.convert_image_dtype(img_gray, tf.float32)

~/projects/datadrivers/venv/lib/python3.6/site-packages/tensorflow/python/ops/gen_image_ops.py in decode_jpeg(contents, channels, ratio, fancy_upscaling, try_recover_truncated, acceptable_fraction, dct_method, name)
   1101                       try_recover_truncated=try_recover_truncated,
   1102                       acceptable_fraction=acceptable_fraction,
-> 1103                       dct_method=dct_method, name=name)
   1104   _result = _outputs[:]
   1105   if _execute.must_record_gradient():

~/projects/datadrivers/venv/lib/python3.6/site-packages/tensorflow/python/framework/op_def_library.py in _apply_op_helper(op_type_name, name, **keywords)
    742       op = g._create_op_internal(op_type_name, inputs, dtypes=None,
    743                                  name=scope, input_types=input_types,
--> 744                                  attrs=attr_protos, op_def=op_def)
    745 
    746     # `outputs` is returned as a separate return value so that the output

~/projects/datadrivers/venv/lib/python3.6/site-packages/tensorflow/python/framework/ops.py in _create_op_internal(self, op_type, inputs, dtypes, input_types, name, attrs, op_def, compute_device)
   3483           input_types=input_types,
   3484           original_op=self._default_original_op,
-> 3485           op_def=op_def)
   3486       self._create_op_helper(ret, compute_device=compute_device)
   3487     return ret

~/projects/datadrivers/venv/lib/python3.6/site-packages/tensorflow/python/framework/ops.py in __init__(self, node_def, g, inputs, output_types, control_inputs, input_types, original_op, op_def)
   1973         op_def = self._graph._get_op_def(node_def.op)
   1974       self._c_op = _create_c_op(self._graph, node_def, inputs,
-> 1975                                 control_input_ops, op_def)
   1976       name = compat.as_str(node_def.name)
   1977     # pylint: enable=protected-access

~/projects/datadrivers/venv/lib/python3.6/site-packages/tensorflow/python/framework/ops.py in _create_c_op(graph, node_def, inputs, control_inputs, op_def)
   1813   except errors.InvalidArgumentError as e:
   1814     # Convert to ValueError for backwards compatibility.
-> 1815     raise ValueError(str(e))
   1816 
   1817   return c_op

ValueError: Shape must be rank 0 but is rank 1 for '{{node DecodeJpeg}} = DecodeJpeg[acceptable_fraction=1, channels=3, dct_method="", fancy_upscaling=true, ratio=1, try_recover_truncated=false](Reshape)' with input shapes: [?].
shabie commented 3 years ago

Well as an update, I was able to move forward by adding the following line as the first to the proces_image func:

raw_image = tf.squeeze(raw_image)

I skipped the label processing for now to see how far I get but end up with yet another error:

InvalidArgumentError: contents must be scalar, got shape [2]
     [[{{node transform/DecodeJpeg}}]]
festeh commented 3 years ago

I believe that is because preprocessing_fn recieves a batch of images, but process_image expects a single image encoded in a byte string (see decode_jpeg docs). You can try this: img_preprocessed=tf.map_fn(process_image, image_raw, dtype=tf.float32)

wardVD commented 3 years ago

@festeh, tried that but this gives the following error:

TypeError: Failed to convert object of type <class 'tensorflow.python.framework.sparse_tensor.SparseTensor'> to Tensor. Contents: SparseTensor(indices=Tensor("DeserializeSparse:0", shape=(None, 1), dtype=int64), values=Tensor("DeserializeSparse:1", shape=(None,), dtype=string), dense_shape=Tensor("DeserializeSparse:2", shape=(1,), dtype=int64)). Consider casting elements to a supported type.

hanneshapke commented 3 years ago

What the starting point for your transformation? a byte string? Do you load the byte string in your ExampleGen component?

hanneshapke commented 3 years ago

You can call the process_image function as follows:

    fn = lambda image: process_image(image)
    img_preprocessed = tf.map_fn(fn, inputs['images_raw'], dtype=tf.float32)

Please note the preprocessing_fn gets a batch of records (e.g. images). The transform function needs to handle this. In the particular case, decode_jpeg can't handle batches. Therefore, we'll need to call the function via the lambda + map_fn calls.

Please close the issue if it solves your issue. Thank you!

wardVD commented 3 years ago

Hi all.

Found the issue.

schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)

I've created a PR that should provide you with a working end-to-end example:

https://github.com/Building-ML-Pipelines/building-machine-learning-pipelines/pull/46

jashshah commented 3 years ago

@shabie were you able to run this part of the beam pipeline as defined under Chapter 5: Standalone Execution of TFT for the Computer Vision Problem?

import tempfile
import tensorflow_transform.beam.impl as tft_beam

with beam.Pipeline() as pipeline:
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    tfrecord_file = "/your/tf_records_file.tfrecord"
    raw_data = (
    pipeline | beam.io.ReadFromTFRecord(tfrecord_file))
    transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
    preprocessing_fn))

I keep running into the following error:

TypeError: byte indices must be integers or slices, not str