tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.github.io/tfx/
Apache License 2.0
2.12k stars 711 forks source link

BigQueryExampleGen uses cached result for the schema during RowToExample #5127

Closed mhsong21 closed 1 year ago

mhsong21 commented 2 years ago

If the bug is related to a specific library below, please raise an issue in the respective repo directly:

TensorFlow Data Validation Repo

TensorFlow Model Analysis Repo

TensorFlow Transform Repo

TensorFlow Serving Repo

System information

Describe the expected behavior I want the executor to get the schema from the fresh result, not cached result. I expect the change would be simple and atomic, just setting cache option to false.

bigquery.job.QueryJobConfig(use_query_cache=False)

Standalone code to reproduce the issue

Providing a bare minimum test case or step(s) to reproduce the problem will greatly help us to debug the issue. If possible, please share a link to Colab/Jupyter/any notebook.

Name of your Organization (Optional)

Other info / logs

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1845, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/example_gen/executor.py", line 48, in RowToExample
    def RowToExample(self, instance: Dict[str, Any]) -> tf.train.Example:
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/utils.py", line 70, in row_to_example
    data_type = field_to_type[key]
KeyError: 'ctx_noti_keywords'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/martin/.cache/bazel/_bazel_martin/8388c24d80d0468dd842adaf3f43ce54/execroot/recommend-ranking/bazel-out/k8-fastbuild/bin/recommend_ranking/bin/run_pipeline.runfiles/recommend-ranking/recommend_ranking/bin/run_pipeline.py", line 172, in <module>
    app.run(run_pipeline)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/absl/app.py", line 308, in run
    _run_main(main, args)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/absl/app.py", line 254, in _run_main
    sys.exit(main(argv))
  File "/home/martin/.cache/bazel/_bazel_martin/8388c24d80d0468dd842adaf3f43ce54/execroot/recommend-ranking/bazel-out/k8-fastbuild/bin/recommend_ranking/bin/run_pipeline.runfiles/recommend-ranking/recommend_ranking/bin/run_pipeline.py", line 45, in run_pipeline
    _run_pipeline_local(pipeline, pipeline_config.env_config.local_config)
  File "/home/martin/.cache/bazel/_bazel_martin/8388c24d80d0468dd842adaf3f43ce54/execroot/recommend-ranking/bazel-out/k8-fastbuild/bin/recommend_ranking/bin/run_pipeline.runfiles/recommend-ranking/recommend_ranking/bin/run_pipeline.py", line 56, in _run_pipeline_local
    runner.run(pipeline)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/tfx_runner.py", line 124, in run
    return self.run_with_ir(pipeline_pb, run_options=run_options_pb, **kwargs)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/local/local_dag_runner.py", line 109, in run_with_ir
    component_launcher.launch()
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 571, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 446, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 98, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
    result = executor.Do(execution_info.input_dict, output_dict,
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 278, in Do
    (example_split
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
    self._run_bundle(
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 817, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1845, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/example_gen/executor.py", line 48, in RowToExample
    def RowToExample(self, instance: Dict[str, Any]) -> tf.train.Example:
  File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/utils.py", line 70, in row_to_example
    data_type = field_to_type[key]
KeyError: "ctx_noti_keywords [while running 'InputToRecord/ToTFExample']"

Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.

1025KB commented 2 years ago

TFX has its own cache, if all input artifact & exec properties (in this case, input is just the query) are the same, it will use last component execution's result. you can turn off TFX cache when creating pipeline object

singhniraj08 commented 2 years ago

@mhsong21,

You can set enable_cache: False while calling while creating create pipeline object in my_pipeline.py file as mentioned in above comment. Please refer Build a custom pipeline for more info on pipeline customisation.

Thank you!

singhniraj08 commented 1 year ago

Closing this due to inactivity. Please take a look into the answers provided above, feel free to reopen and post your comments(if you still have queries on this). Thank you!

google-ml-butler[bot] commented 1 year ago

Are you satisfied with the resolution of your issue? Yes No