apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.7k stars 4.2k forks source link

WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS #21061

Open damccorm opened 2 years ago

damccorm commented 2 years ago

WriteToBigQuery fails when using the FILE_LOADS method in the BundleBasedDirectRunner.

The issue appears to be in wait_for_bq_job, where the function expects job_reference to be an actual JobReference instance and not a string. However, the WaitForBQJobs DoFn appears to be passing a string as the argument. I believe this is during the copy step, and I'm not calling this code directly (so unfortunately I can't just pass a TableReference instance myself).

Here is a traceback:

 


request_worker_1      | ERROR:root:Traceback (most recent call last):
request_worker_1      |   File
"/app/main.py", line 209, in process_message
request_worker_1      |     construct_and_run_pipeline(request)
request_worker_1
     |   File "/app/main.py", line 190, in construct_and_run_pipeline
request_worker_1      |     return
result.wait_until_finish()
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
line 588, in wait_until_finish
request_worker_1      |     self._executor.await_completion()
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line
433, in await_completion
request_worker_1      |     self._executor.await_completion()
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line
482, in await_completion
request_worker_1      |     raise t(v).with_traceback(tb)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line
371, in call
request_worker_1      |     self.attempt_call(
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
line 414, in attempt_call
request_worker_1      |     evaluator.process_element(value)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py",
line 880, in process_element
request_worker_1      |     self.runner.process(element)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1225, in
process
request_worker_1      |     self._reraise_augmented(exn)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py",
line 1306, in _reraise_augmented
request_worker_1      |     raise new_exn.with_traceback(tb)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1223, in
process
request_worker_1      |     return self.do_fn_invoker.invoke_process(windowed_value)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 752, in invoke_process
request_worker_1
     |     self._invoke_process_per_window(
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py",
line 877, in _invoke_process_per_window
request_worker_1      |     self.process_method(*args_for_process),
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line
730, in process
request_worker_1      |     self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10,
max_retries=0)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 562, in wait_for_bq_job
request_worker_1      |     job_reference.projectId, job_reference.jobId,
job_reference.location)
request_worker_1      | AttributeError: 'str' object has no attribute 'projectId'
[while running 'write tweets to bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']

 

Here is the WriteToBigQuery step that is failing (note that the callable passed for table returns a TableReference instance):


WriteToBigQuery(
     table=lambda row: bigquery_tools.parse_table_reference(row["table_name"]),

    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,

    ignore_insert_ids=True,
     method="FILE_LOADS", # using STREAMING_INSERTS 'fixes' the issue

    batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
     schema=schema,
)

 

Note that this issue does not occur when using the standard DirectRunner, nor does it occur when using the STREAMING_INSERTS method.

Thanks! (And apologies if I left out any important information. This is the first issue I've opened here.)

Imported from Jira BEAM-12659. Original Jira may contain additional context. Reported by: milesmcc.

blazingbhavneek commented 1 year ago

Hey there! 👋 I'm new to this repository and eager to contribute! 🌟 Could you kindly suggest some entry point or files to look into?

damccorm commented 1 year ago

Hey, saw you added this comment several places. I'd recommend focusing on a single issue at first (I answered the underlying question here - https://github.com/apache/beam/issues/20298#issuecomment-1547888993)