apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.87k stars 4.26k forks source link

[Bug]: Python WriteToBigQuery with File Loads and dynamic table destinations sometimes remove temp tables before copy finishes or is triggered. #23670

Open rizenfrmtheashes opened 2 years ago

rizenfrmtheashes commented 2 years ago

What happened?

We believe that in the Python SDK, for the GCP IO library, In WriteToBigQuery, using file_loads and dynamic table destinations, that sometimes in rare scenarios temp tables get removed via the RemoveTempTables ptransform before the prior Copy Jobs finish or even get kicked off.

We've only seen this occur under heavy load (many millions of rows) and high parallelism (beam 2.40, dataflow v2 runner, autoscale from 1 to ~40 n1-standard-4 instances).

As a note, although we are using beam 2.40, we are using what are the current master branch versions of this file here which contain the latest fixes for bigquery file loads from pr #23012 via a copy/paste patch.

We encounter stack traces like

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 598, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1004, in process_bundle
    element.data)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/steps/bigquery_file_loads_patch_40.py", line 610, in process
    self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 635, in wait_for_bq_job
    job_reference.jobId, job.status.errorResult))
RuntimeError: BigQuery job beam_bq_job_COPY_redactednameofjobhere31945c91d69d91e3019c4b1789457ff0ff7c289120221014000701099659_COPY_STEP_311_98bf2821aecda3f10bbc5b6cf26716f0 failed. Error Result: <ErrorProto
 message: 'Not found: Table REDACTED_GCP_PROJECT_NAME:REDACTED_BQ_DATASET.beam_bq_job_LOAD_prodmetricsstreaming31945c91d69d91e3019c4b1789457ff0ff7c289120221014000701099659_LOAD_STEP_838_5318a60f33ee48e464b734da26b8c43f_01f82b6914fb40bdb98be33b7dd446f4'
 reason: 'notFound'> [while running 'Write [REDACTED] to BigQuery File Loads/BigQueryBatchFileLoads/ParDo(TriggerCopyJobs)/ParDo(TriggerCopyJobs)-ptransform-24']

as a note bigquery_file_loads_patch_40.py is just a reference to a copy/pasted version of the source bigquery_file_loads.py file in the gcp/io section of the SDK that we used to backport fixes from newer versions of beam (like #23012). We did dependency checking to make sure the backported fixes were okay.

You can likely reproduce by using a code pipeline like this doc here (used for a prior bug report), and instead send millions of rows through

Issue Priority

Priority: 2

Issue Component

Component: io-py-gcp

rizenfrmtheashes commented 2 years ago

.add-labels gcp,io,python

rizenfrmtheashes commented 2 years ago

It's entirely possible that during an autoscale event, work items may get replayed, and the third party side-effects that involve the copy job and the remove jobs may get replayed. specifically for work items that trigger a remove job before being reset since it didn't reach the end of the pipeline and considered "committed". Regardless, there should be a way to handle/track this, or make sure that the load to temp table -> copy to destination -> remove temp table happens as a single commit phase, and allow duplication.

tvalentyn commented 2 years ago

cc: @ahmedabu98

ahmedabu98 commented 2 years ago

or make sure that the load to temp table -> copy to destination -> remove temp table happens as a single commit phase

this is most likely not a reality because of the GBK here. I'm wondering if that GBK is necessary (I think BQ ignores delete ops for non-existing tables so sending more than one request for a table should be fine). @pabloem WDYT? The GBK is an old addition but thought I'd still ask.

ahmedabu98 commented 2 years ago

Although, the GBK maintains a boundary for retries here. Load to temp table and copy to destination steps are fusible steps, and delete tables is it's own separate step. So any retries happening while deleting temp tables would not trigger a retry for the load and copy steps.

ahmedabu98 commented 2 years ago

@rizenfrmtheashes does this only happen with dynamic destinations? Also can you check your log timestamps and see if these errors start showing up near an autoscale event?

rizenfrmtheashes commented 2 years ago

I think it only happens with dynamic destinations because there aren't any remove table steps in static destinations. That basically directly loads the data into destination table, instead of using a temp table intermediate step (there is some stuff around if some data hits some number thresholds to require partitioning that falls outside this, but that's an edge case).

I just checked the job I use in production where this occurs, and the last autoscaling event occurred a little less than 2 hours prior to the first log where this error gets reported. So it's likely not autoscaling. I didn't see a one off instance re-allocation either during that time.

So any retries happening while deleting temp tables would not trigger a retry for the load and copy steps.

After re-reading the code there, I agree, the GBK isn't in the right place for this replay phenomenon to occur like I'm describing.

I'll keep digging on my end to find more smoking guns.

ahmedabu98 commented 2 years ago

there aren't any remove table steps in static destinations

So you're right that the load to temp --> copy --> delete temp route is always chosen for dynamic destinations. However, this does also get triggered by large loads that don't fit in a single load job. So I'm wondering if this issue is unique to dynamic destinations or if it's for all writes that take this alternative route.

rizenfrmtheashes commented 2 years ago

You're right. I haven't reproduced (or had the time to reproduce) the case you're talking about, but I doubt it's just isolated to dynamic table destinations. I suspect the error also occurs for large direct loads like you're describing, since there isn't much different code wise in either situation.

rizenfrmtheashes commented 1 year ago

I want to bump/note that I still experience this issue. even on beam 2.48. I continue to use a patched version. I have had need to launch new kinds of pipelines as of late, also writing to BQ via file loads and dynamic table destinations, and I encounter this issue still.

Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1459, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 562, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 567, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 1731, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs
  File "/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 594, in finish_bundle
    self.bq_wrapper.wait_for_bq_job(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 656, in wait_for_bq_job
    raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_COPY_devREDACTEDv0001SERVICE_REDACTEDsessionstreamingdee97770f8ae41cda27e46ed9ad361dbe411c41020230719133458358779_COPY_STEP_803_be4d9436fd109617c17082aaf3b8be8d failed. Error Result: <ErrorProto
 message: 'Not found: Table PROJECT_REDACTED:867395ee_660f_4f8d_89b9_51052234406b.beam_bq_job_LOAD_devREDACTEDv0001SERVICE_REDACTEDsessionstreamingdee97770f8ae41cda27e46ed9ad361dbe411c41020230719133458358779_LOAD_STEP_187_048806eef0c9a1f1281a7b31aa85d4ff_b6eba9ce9ae24e6d8d23f15692ce5f01'
 reason: 'notFound'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 667, in process_bundle
    bundle_processor.process_bundle(instruction_id))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1067, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 939, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 942, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 943, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 1480, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 1461, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 1508, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1459, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 562, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 567, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 1731, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs
  File "/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 594, in finish_bundle
    self.bq_wrapper.wait_for_bq_job(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 656, in wait_for_bq_job
    raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_COPY_devREDACTEDv0001SERVICE_REDACTEDsessionstreamingdee97770f8ae41cda27e46ed9ad361dbe411c41020230719133458358779_COPY_STEP_803_be4d9436fd109617c17082aaf3b8be8d failed. Error Result: <ErrorProto
 message: 'Not found: Table PROJECT_REDACTED:867395ee_660f_4f8d_89b9_51052234406b.beam_bq_job_LOAD_devREDACTEDv0001SERVICE_REDACTEDsessionstreamingdee97770f8ae41cda27e46ed9ad361dbe411c41020230719133458358779_LOAD_STEP_187_048806eef0c9a1f1281a7b31aa85d4ff_b6eba9ce9ae24e6d8d23f15692ce5f01'
 reason: 'notFound'> [while running 'Write BQ Sessioned Events to BigQuery/BigQueryBatchFileLoads/ParDo(TriggerCopyJobs)/ParDo(TriggerCopyJobs)-ptransform-24']

It would be helpful to know if this is being explored/actioned because otherwise I have to continue using my patch that never deletes the temp tables, with which I have had no issues with.

This thread had a suspicion that some retry before a group by commit barrier was causing write items to get replayed, with third party sideeffects causing these temp tables to be deleted and so not working on the subsequent retries. but we couldn't figure out where the retry loop could be happening where the temp tables wouldn't be regenerated or renamed. I wonder if there's a mix up in the names of the jobs to tables?

Also as a note, I only saw this happening whenever I drained a job. Might be a red herring.