tensorflow / transform

Input pipeline framework
Apache License 2.0
983 stars 211 forks source link

TypeError: an integer is required [while running 'AnalyzeAndTransformDataset/TransformDataset/InstrumentInputBytes[Transform] #294

Open santoshcbit2002 opened 1 year ago

santoshcbit2002 commented 1 year ago

Hi

TF Analyze and Transform step fails when executed on DataFlow Runner . The step completes successfully when the pipeline is run on Direct Runner. For both Direct Run and Data flow Run, I have used the exact same train.csv file hosted in GCS.

Here is the code for a very minimal pre-processing: `# Preprocessing function

def preprocessing_fn(inputs):
import tensorflow as tf

SELECTED_NUMERIC_FEATURE_KEYS = ['feature_1','feature_2','feature_3','feature_4','feature_5',
'feature_6','feature_7','feature_8','feature_9','feature_10','feature_11',
'feature_12','feature_13','feature_14','feature_15','feature_16','feature_17',
'feature_18','feature_19','feature_20','feature_21','feature_22',
'feature_23','feature_24','feature_25','feature_26','feature_27', 'feature_28']

LABEL_KEY = 'label'

outputs = dict()
for key in SELECTED_NUMERIC_FEATURE_KEYS:  
    outputs[key] = inputs[key]               

outputs[LABEL_KEY] = tf.dtypes.cast(inputs[LABEL_KEY], tf.dtypes.int64)

return outputs

def encode_data(batch,temp):

from tfx_bsl.coders.example_coder import RecordBatchToExamples

return RecordBatchToExamples(batch)

pipeline code

def transform_data(train_data_file, test_data_file, transformed_train_dataset_location, transformed_test_dataset_location, transform_func_location, args):

runner = args['runner']
temp_dir = args.pop('temp_dir')
input_schema = args.pop('input_schema')
ordered_columns = args.pop('ordered_columns')

input_schema = tft.DatasetMetadata.from_feature_spec(raw_data_feature_spec).schema

params2str = lambda key, value: f"--{key}={value}" if value else f"--{key}"
options = []
for (key, value) in args.items():
    options.append(params2str(key, value))

print(options)
pipeline_options = PipelineOptions(options)

with beam.Pipeline(options=pipeline_options) as pipeline:
    with tft_beam.Context(temp_dir=temp_dir):
        train_csv_tfxio = tfxio.CsvTFXIO(
              file_pattern=train_data_file,
              skip_header_lines=1,
              column_names=ordered_columns,
              schema=input_schema)

        raw_data = (
               pipeline |
              'ReadTrainCsv' >> train_csv_tfxio.BeamSource()
        )

        raw_dataset = (raw_data, train_csv_tfxio.TensorAdapterConfig())

        transformed_dataset, transform_fn = (
                raw_dataset | 
                tft_beam.AnalyzeAndTransformDataset(preprocessing_fn, output_record_batches=True)
        )

        transformed_data, _ = transformed_dataset

        _ = ( transformed_data
             | 'EncodeTrainData' >> beam.FlatMapTuple(encode_data)
             | 'WriteTrainData' >> beam.io.WriteToTFRecord(transformed_train_dataset_location)
            )

        # Apply transform function to test data.
        test_csv_tfxio = tfxio.CsvTFXIO(
              file_pattern=test_data_file,
              skip_header_lines=1,
              column_names=ordered_columns,
              schema=input_schema
        )

        raw_test_data = ( 
                 pipeline | 
                'ReadTestCsv' >> test_csv_tfxio.BeamSource()
        )

        raw_test_dataset = (raw_test_data, test_csv_tfxio.TensorAdapterConfig())

        transformed_test_dataset = (
              (raw_test_dataset, transform_fn)
              | tft_beam.TransformDataset(output_record_batches=True)
        )

        transformed_test_data, _ = transformed_test_dataset

        _ = (transformed_test_data
            | 'EncodeTestData' >> beam.FlatMapTuple(encode_data)
            | 'WriteTestData' >> beam.io.WriteToTFRecord(transformed_test_dataset_location)
            )

        _ = (
              transform_fn
              | 'WriteTransformFn' >> tft_beam.WriteTransformFn(transform_func_location)
        )`

Error Encountered: `ERROR:apache_beam.runners.dataflow.dataflow_runner:Console URL: https://console.cloud.google.com/dataflow/jobs//2022-12-31_00_37_41-4620021112150654208?project= Traceback (most recent call last): File "/Users/santoshsivapurapu/Documents/Projects/0_Sandbox/predict-higgs-boson-collision-events/Notebooks/utils/transformer.py", line 211, in transform_data(train_data_file=TRAIN_DATASET_URI, File "/Users/santoshsivapurapu/Documents/Projects/0_Sandbox/predict-higgs-boson-collision-events/Notebooks/utils/transformer.py", line 178, in transformdata = ( File "/Users/santoshsivapurapu/opt/anaconda3/envs/tfx/lib/python3.9/site-packages/apache_beam/pipeline.py", line 598, in exit self.result.wait_until_finish() File "/Users/santoshsivapurapu/opt/anaconda3/envs/tfx/lib/python3.9/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1673, in wait_until_finish raise DataflowRuntimeException( apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 321, in apache_beam.runners.worker.operations.GeneralPurposeConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 198, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 215, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 267, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1471, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1482, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 950, in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size File "apache_beam/coders/stream.pyx", line 234, in apache_beam.coders.stream.get_varint_size TypeError: an integer is required

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 321, in apache_beam.runners.worker.operations.GeneralPurposeConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 198, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 215, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 267, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1471, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1482, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 950, in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size File "apache_beam/coders/stream.pyx", line 234, in apache_beam.coders.stream.get_varint_size TypeError: an integer is required [while running 'AnalyzeAndTransformDataset/AnalyzeDataset/InstrumentInputBytes[AnalysisFlattenedPColl]/IncrementCounter/IncrementCounter-ptransform-35']`

Version info: tensorflow-2.8.4
tensorflow-transform-1.8.0 tensorflow_data_validation-1.8.0 tfx-bsl-1.8.0 Python runtime version: Python 3.9.14 Apache Beam SDK 2.42.0

zoyahav commented 1 year ago

Please share your definition of raw_data_feature_spec as well.

santoshcbit2002 commented 1 year ago

Here is the raw_data_feature_spec:

Raw data schema

raw_data_feature_spec = dict(

[(name, tf.io.FixedLenFeature([], tf.float32))
 for name in ordered_csv_columns]

) {'label': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_1': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_2': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_3': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_4': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_5': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_6': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_7': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_8': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_9': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_10': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_11': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_12': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_13': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_14': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_15': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_16': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_17': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_18': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_19': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_20': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_21': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_22': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_23': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_24': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_25': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_26': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_27': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'feature_28': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None)}

santoshcbit2002 commented 1 year ago

@zoyahav just a quick update. using pipeline options I have disabled dataflow runner V2 ( 'experiments': 'disable_runner_v2') . This change did the trick and pipeline completed clean. I have no idea why it fails on v2 runner though.

robertwb commented 1 year ago

Would it be possible to provide a fully self-contained repro (as runner v2 is becoming the default)?

tvalentyn commented 1 year ago

Friendly ping. Would it be possible to provide a fully self-contained repro? Do you still experience this issue in newer versions of Apache Beam?

tvalentyn commented 1 year ago

Please note that disabling runner v2 will not be possible in a future version of Beam.