apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Feature Request]: Mechanism to re-create deleted table if it is already in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition #25225

Open tomlynchRNA opened 1 year ago

tomlynchRNA commented 1 year ago

What would you like to happen?

Hello,

I am running an Apache Beam pipeline and streaming inserts into Google BigQuery with the Python SDK.

I am facing an issue where because Beam only creates tables once before storing the name a list called _KNOWN_TABLES, if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.

You can see here where the _create_table_if_needed method returns early if the table is already in _KNOWN_TABLES: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1463

I have the pipeline create disposition set to the default CREATE_IF_NEEDED, therefore I expect that if the table does not exist (and is needed) to stream inserts, that it will be created.

I propose that a mechanism be implemented allowing this behaviour, and would be willing to make the changes & open a pull request.

Looking forward to your thoughts, Tom

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

ahmedabu98 commented 1 year ago

Hey @tomlynchRNA, can you provide a stack trace of the error you're seeing here?

if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.

tomlynchRNA commented 1 year ago

Hey @tomlynchRNA, can you provide a stack trace of the error you're seeing here?

if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.

stack trace, quite big ``` Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1514, in process return self._flush_batch(destination) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1564, in _flush_batch ignore_unknown_values=self.ignore_unknown_columns) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1264, in insert_rows ignore_unknown_values=ignore_unknown_values) File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 275, in wrapper return fun(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 713, in _insert_all_rows timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC) File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3604, in insert_rows_json timeout=timeout, File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api return call() File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 354, in retry_wrapped_func on_error=on_error, File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 191, in retry_target return target() File "/usr/local/lib/python3.7/site-packages/google/cloud/_http/__init__.py", line 494, in api_request raise exceptions.from_http_response(response) google.api_core.exceptions.NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/projects/REDACTED/datasets/REDACTED/tables/REDACTED/insertAll?prettyPrint=false: Not found: Table REDACTED During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle element.timer_family_id, timer_data) File "apache_beam/runners/worker/operations.py", line 931, in apache_beam.runners.worker.operations.DoOperation.process_timer File "apache_beam/runners/common.py", line 1453, in apache_beam.runners.common.DoFnRunner.process_user_timer File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1450, in apache_beam.runners.common.DoFnRunner.process_user_timer File "apache_beam/runners/common.py", line 579, in apache_beam.runners.common.DoFnInvoker.invoke_user_timer File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1514, in process return self._flush_batch(destination) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1564, in _flush_batch ignore_unknown_values=self.ignore_unknown_columns) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1264, in insert_rows ignore_unknown_values=ignore_unknown_values) File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 275, in wrapper return fun(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 713, in _insert_all_rows timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC) File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3604, in insert_rows_json timeout=timeout, File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api return call() File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 354, in retry_wrapped_func on_error=on_error, File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 191, in retry_target return target() File "/usr/local/lib/python3.7/site-packages/google/cloud/_http/__init__.py", line 494, in api_request raise exceptions.from_http_response(response) google.api_core.exceptions.NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/REDACTED/datasets/REDACTED/tables/REDACTED/insertAll?prettyPrint=false: Not found: Table REDACTED [while running 'Write Custom Vars to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-66'] ```
chamikaramj commented 1 year ago

I think in general, for Beam source/sink I/O, we assume read/write data store resources to not be deleted by third parties while the pipeline is running. Trying to add this as a feature to Beam I/O in general will probably will need a lot of re-work (even though we might might be able to fix for this instance).

Also, CREATE_IF_NEEDED I think (as it's defined to Beam BigQuery sink right now) means that tables will be created once per pipeline (not that tables will be re-created if they are deleted at any state of the pipeline).

chamikaramj commented 1 year ago

That said, I'm OK with getting this fix in if we are clear that it does not modify the guarantees offered by CREATE_IF_NEEDED (i.e. the pipeline may still fail or get stuck if output tables get deleted by third parties during execution).