ray-project / ray_beam_runner

Ray-based Apache Beam runner
Apache License 2.0
42 stars 12 forks source link

Running TFX tests failed with error #60

Closed wilsonwang371 closed 1 year ago

wilsonwang371 commented 1 year ago

I am executing TFX code on top of beam on ray. I saw the following error. Need further investigation.

2023-01-11 22:06:21,746 INFO worker.py:1538 -- Started a local Ray instance.
(ray_execute_bundle pid=932030) 2023-01-11 22:06:26.138460: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
(ray_execute_bundle pid=932030) To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
(ray_execute_bundle pid=932030) 2023-01-11 22:06:26.273821: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=932030) 2023-01-11 22:06:26.273868: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
(ray_execute_bundle pid=932030) 2023-01-11 22:06:27.034191: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=932030) 2023-01-11 22:06:27.034290: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=932030) 2023-01-11 22:06:27.034306: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 6 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 90, in process
    self._run_node()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 99, in _run_node
    launcher.Launcher(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 573, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 448, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 112, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
    result = executor.Do(execution_info.input_dict, output_dict,
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 283, in Do
    (example_split
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 198, in run_pipeline
    return self.execute_pipeline(stage_context, stages)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 227, in execute_pipeline
    result = self._run_stage(runner_execution_context, bundle_ctx, queue)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 274, in _run_stage
    ) = self._run_bundle(
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 348, in _run_bundle
    result = ray.get(next(result_generator))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py", line 2309, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::ray_execute_bundle() (pid=932030, ip=10.189.114.217)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/execution.py", line 113, in ray_execute_bundle
    result_future = worker_handler.control_conn.push(instruction_request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 815, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "apache_beam/runners/common.py", line 884, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/sdf_utils.py", line 62, in __init__
    raise ValueError(
ValueError: Initialize ThreadsafeRestrictionTracker requiresRestrictionTracker.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./feature_skewness.py", line 75, in <module>
    BeamDagRunner().run(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/tfx_runner.py", line 124, in run
    return self.run_with_ir(pipeline_pb, run_options=run_options_pb, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 297, in run_with_ir
    logging.info('Node %s is scheduled.', node_id)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
    self._run_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 324, in apache_beam.runners.worker.operations.GeneralPurposeConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 905, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 90, in process
    self._run_node()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 99, in _run_node
    launcher.Launcher(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 573, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 448, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 112, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
    result = executor.Do(execution_info.input_dict, output_dict,
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 283, in Do
    (example_split
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 198, in run_pipeline
    return self.execute_pipeline(stage_context, stages)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 227, in execute_pipeline
    result = self._run_stage(runner_execution_context, bundle_ctx, queue)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 274, in _run_stage
    ) = self._run_bundle(
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 348, in _run_bundle
    result = ray.get(next(result_generator))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py", line 2309, in get
    raise value.as_instanceof_cause()
RuntimeError: ray.exceptions.RayTaskError(ValueError): ray::ray_execute_bundle() (pid=932030, ip=10.189.114.217)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/execution.py", line 113, in ray_execute_bundle
    result_future = worker_handler.control_conn.push(instruction_request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 815, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "apache_beam/runners/common.py", line 884, in apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/sdf_utils.py", line 62, in __init__
    raise ValueError(
ValueError: Initialize ThreadsafeRestrictionTracker requiresRestrictionTracker. [while running 'Run[FileBasedExampleGen]']
wilsonwang371 commented 1 year ago

@pabloem can you shed some light on this?

wilsonwang371 commented 1 year ago

This must have something to do with our internal beam class that reads parquet file. After using apache_beam.io.ReadFromParquet instead of bytebeam.io.ReadFromParquet now it is a different error now:

(ray_execute_bundle pid=1067832) 2023-01-12 01:33:00.663969: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
(ray_execute_bundle pid=1067832) To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
(ray_execute_bundle pid=1067832) 2023-01-12 01:33:00.800820: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=1067832) 2023-01-12 01:33:00.800869: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
(ray_execute_bundle pid=1067832) 2023-01-12 01:33:01.542184: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=1067832) 2023-01-12 01:33:01.542287: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
(ray_execute_bundle pid=1067832) 2023-01-12 01:33:01.542303: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 11 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "./feature_skewness.py", line 75, in <module>
    BeamDagRunner().run(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/tfx_runner.py", line 124, in run
    return self.run_with_ir(pipeline_pb, run_options=run_options_pb, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 297, in run_with_ir
    logging.info('Node %s is scheduled.', node_id)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
    self._run_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 324, in apache_beam.runners.worker.operations.GeneralPurposeConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 905, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 90, in process
    self._run_node()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 99, in _run_node
    launcher.Launcher(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 573, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 448, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 112, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
    result = executor.Do(execution_info.input_dict, output_dict,
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 283, in Do
    (example_split
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 198, in run_pipeline
    return self.execute_pipeline(stage_context, stages)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 227, in execute_pipeline
    result = self._run_stage(runner_execution_context, bundle_ctx, queue)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 274, in _run_stage
    ) = self._run_bundle(
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/ray_fn_runner.py", line 348, in _run_bundle
    result = ray.get(next(result_generator))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py", line 2309, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(UnicodeDecodeError): ray::ray_execute_bundle() (pid=1067832, ip=10.189.114.217)
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1879, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/components/example_gen/utils.py", line 102, in dict_to_example
    pyval = pyval.decode(_DEFAULT_ENCODING)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8d in position 6: invalid start byte

During handling of the above exception, another exception occurred:

ray::ray_execute_bundle() (pid=1067832, ip=10.189.114.217)
  File "/home/wilson.wang/ads_infra/ray_beam_runner/ray_beam_runner/portability/execution.py", line 113, in ray_execute_bundle
    result_future = worker_handler.control_conn.push(instruction_request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
    return getattr(self, request_type)(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 817, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1879, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "/home/wilson.wang/miniconda3/lib/python3.8/site-packages/tfx/components/example_gen/utils.py", line 102, in dict_to_example
    pyval = pyval.decode(_DEFAULT_ENCODING)
UnicodeDecodeError: 'utf-8 [while running 'InputToRecord/ToTFExample']' codec can't decode byte 0x8d in position 6: invalid start byte
wilsonwang371 commented 1 year ago

I fixed this issue, this is related to tfx protobuf deserialization.