tensorflow / transform

Input pipeline framework
Apache License 2.0
984 stars 213 forks source link

TF Transform exception "unhashable type: 'ConfigProto'" when there is an unused "import pyspark" statement #144

Closed junwan01 closed 3 years ago

junwan01 commented 4 years ago

tensorflow==2.0.0 apache-beam==2.16.0 tensorflow-transform==0.15.0 python 3.7

When I run a transform that has combiner invoked, e.g. tft.bucketize(), it crashes with message: "unhashable type: 'ConfigProto'", if there is an import pyspark statement at the beginning of the module, even though the imported class is never referenced inside the transform code. If I move the import below the import tensorflow_transform as tft, it works again.

When I step debug, the Python seems to be confused about the object graph_state_options of class _QuantilesGraphStateOptions at certain point, and think it is DType, and invoked wrong __hash__().

Spark is not part of transform logic, however it is imported by some upstream Python code that produces the tfrecord dataset to be used by TF Transform, so we have to import it.

The same test code runs fine under TFT 0.14.0, TF 1.14 and apache-beam 2.15.0.

Here is the slimmed down code that can reproduce the error.

import tempfile
import tensorflow as tf
# If the import is before tensorflow_transform, will crash.
from pyspark.sql.types import Row

import tensorflow_transform as tft

# If import is after tensorflow_transform, will be fine.
# from pyspark.sql.types import Row

from tensorflow_transform.tf_metadata import dataset_metadata, schema_utils
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
from apache_beam.options.pipeline_options import PipelineOptions

with tempfile.TemporaryDirectory("", "config_proto_") as temp_dir:
    print("temp_dir: {}".format(temp_dir))
    def preprocessing_fn(inputs):
        outputs = {}
        outputs['bucket_ft'] = tft.bucketize(inputs['bucket_ft'], num_buckets=4)
        return outputs

    r = 10
    examples = []
    for i in range(r):
        example = tf.train.Example()
        example.features.feature['bucket_ft'].int64_list.value.extend([i])
        examples.append(example.SerializeToString())

    feature_spec = {'bucket_ft': tf.io.FixedLenFeature([], tf.int64)}
    train_proto = schema_utils.schema_from_feature_spec(feature_spec)
    train_metadata = dataset_metadata.DatasetMetadata(train_proto)

    with beam.Pipeline(options=PipelineOptions()) as p:
        with tft_beam.Context(temp_dir=temp_dir):
            coder = tft.coders.ExampleProtoCoder(train_metadata.schema, serialized=True)

            train_data = p | beam.Create(examples) | 'DecodeTrain' >> beam.Map(coder.decode)

            (transformed_train_data, transformed_metadata), transform_fn = (
                    (train_data, train_metadata)
                    | 'AnalyzeAndTransform' >> tft_beam.AnalyzeAndTransformDataset(
                preprocessing_fn))
            (transformed_train_data | "print" >> beam.Map(lambda x: print(x)))

Here is the complete error message:

WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
2019-10-29 13:40:59.556689: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2019-10-29 13:40:59.571302: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7f90ef033350 executing computations on platform Host. Devices:
2019-10-29 13:40:59.571315: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): Host, Default Version
WARNING:tensorflow:From /Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_core/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
WARNING:tensorflow:From /Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_core/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 

Ran 1 test in 1.050s

FAILED (errors=1)

Error
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 846, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 871, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/beam/analyzer_impls.py", line 672, in create_accumulator
    return self._combiner.create_accumulator()
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 1239, in create_accumulator
    graph_state = self._get_graph_state()
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 1236, in _get_graph_state
    self._graph_state_options)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 1568, in get_graph_state
    result = self._graph_states_by_options.get(graph_state_options)
TypeError: unhashable type: 'ConfigProto'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jwan/.pyenv/versions/3.7.3/lib/python3.7/unittest/case.py", line 59, in testPartExecutor
    yield
  File "/Users/jwan/.pyenv/versions/3.7.3/lib/python3.7/unittest/case.py", line 615, in run
    testMethod()
  File "/Users/jwan/src/Majic/majic-pyutil/majicutil/tests/test_tft_bucketize.py", line 57, in test_tft_analyzers_config_proto
    preprocessing_fn))
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 427, in __exit__
    self.run().wait_until_finish()
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 407, in run
    self._options).run(False)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 420, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 371, in run_pipeline
    default_environment=self._default_environment))
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 378, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 460, in run_stages
    stage_context.safe_coders)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 738, in _run_stage
    result, splits = bundle_manager.process_bundle(data_input, data_output)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1719, in process_bundle
    part, expected_outputs), part_inputs):
  File "/Users/jwan/.pyenv/versions/3.7.3/lib/python3.7/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/Users/jwan/.pyenv/versions/3.7.3/lib/python3.7/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Users/jwan/.pyenv/versions/3.7.3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/Users/jwan/.pyenv/versions/3.7.3/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1719, in <lambda>
    part, expected_outputs), part_inputs):
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1657, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1085, in push
    response = self.worker.do_instruction(request)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
    request.instruction_id)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
    data.ptransform_id].process_encoded(data.data)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 428, in apache_beam.runners.worker.operations.ImpulseReadOperation.process
  File "apache_beam/runners/worker/operations.py", line 435, in apache_beam.runners.worker.operations.ImpulseReadOperation.process
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 100, in apache_beam.runners.worker.operations.ConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 655, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 655, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 846, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 871, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/beam/analyzer_impls.py", line 672, in create_accumulator
    return self._combiner.create_accumulator()
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 1239, in create_accumulator
    graph_state = self._get_graph_state()
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 1236, in _get_graph_state
    self._graph_state_options)
  File "/Users/jwan/src/Majic/majic-pyutil/venv-3.7/lib/python3.7/site-packages/tensorflow_transform/analyzers.py", line 1568, in get_graph_state
    result = self._graph_states_by_options.get(graph_state_options)
TypeError: unhashable type: 'ConfigProto' [while running 'AnalyzeAndTransform/AnalyzeDataset/CacheableCombineAccumulate[bucketize/quantiles]/InitialCombineGlobally/KeyWithVoid']

Process finished with exit code 1
rmothukuru commented 4 years ago

@junwan01, I have tried reproducing the error but I didn't get any error, even though I have used import pyspark before and after import tensorflow_transform as tft. It may be because you are defining the Class but not creating an Instance of it. Can you please share the complete code so that we can reproduce the error at our end. Thanks!

junwan01 commented 4 years ago

@rmothukuru, thanks for looking into this.

I have update the code in the original report, so that it will reproduce the problem. My initial code was indended to be run as a unit test in PyCharm, and not a standalone as script that will trigger the failure. I have stripped unittest class, and only keep the core part of the code that is problematic. You should be able to run this as a script and see the error. Thanks!

rmothukuru commented 4 years ago

Could reproduce the issue in Google Colab with Tensorflow Version 2.0.0, Tensorflow_transform Version 0.15.0 and Apache_Beam Version 2.16.0.

Here is the Github Gist of Google Colab.

iindyk commented 3 years ago

Was fixed by https://github.com/tensorflow/transform/commit/d17afa3a12b038be342d6630e9556d010712cb23