apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.83k stars 4.24k forks source link

[Bug]: BigQueryIO coding issues when using withFormatFunction() #25122

Open bvolpato opened 1 year ago

bvolpato commented 1 year ago

What happened?

BigQueryIO has coder issues when using a very specific codepath utilizing withFormatFunction() + getFailedInsertsWithErr(), which are just exposed when using Dataflow Runner v2.

In short, I have a PCollection<byte[]> that gets written to BigQueryIO (through Streaming Inserts) and uses .withFormatFunction() to convert it into a TableRow. Somehow, with withExtendedErrorInfo and consuming getFailedInsertsWithErr to pipe to another BigQueryIO, the underlying coder is still ByteArrayCoder (the original input's coder, not TableRow) and FnApiDoFnRunner can not consume the affected TableRows correctly.


Code to reproduce: https://gist.github.com/bvolpato/a5f3f1f44071eafc1034935bc4fffbae

Cause:

java.lang.ClassCastException: class com.google.api.services.bigquery.model.TableRow cannot be cast to class [B (com.google.api.services.bigquery.model.TableRow is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:53)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:44)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:30)

Full stack trace:

ERROR [main] (MonitoringUtil.java:84) - 2023-01-23T17:44:23.179Z: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo@1b992c76' with coder 'TableRowInfoCoder'.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds.processElement(TagWithUniqueIds.java:68)
    at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable.processElement(GenerateShardedTable.java:44)
    at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:126)
    at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84)
    at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at com.sample.CreateByteToBigQueryDLQ$1.processElement(CreateByteToBigQueryDLQ.java:104)
    at com.sample.CreateByteToBigQueryDLQ$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$FinishBundleArgumentProvider$Context.output(FnApiDoFnRunner.java:2139)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:298)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1773)
    at org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:133)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:556)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo@1b992c76' with coder 'TableRowInfoCoder'.
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130)
    at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:461)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
Caused by: java.lang.ClassCastException: class com.google.api.services.bigquery.model.TableRow cannot be cast to class [B (com.google.api.services.bigquery.model.TableRow is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:53)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:44)
    at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.encode(TableRowInfoCoder.java:30)
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:130)
    at org.apache.beam.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:37)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:461)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds.processElement(TagWithUniqueIds.java:68)
    at org.apache.beam.sdk.io.gcp.bigquery.TagWithUniqueIds$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable.processElement(GenerateShardedTable.java:44)
    at org.apache.beam.sdk.io.gcp.bigquery.GenerateShardedTable$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:126)
    at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2240)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84)
    at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:813)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at com.sample.CreateByteToBigQueryDLQ$1.processElement(CreateByteToBigQueryDLQ.java:104)
    at com.sample.CreateByteToBigQueryDLQ$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$FinishBundleArgumentProvider$Context.output(FnApiDoFnRunner.java:2139)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:298)
    at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1773)
    at org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:133)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:556)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

reuvenlax commented 1 year ago

This issue might be in CreateByteToBigQueryDLQ . I can't find the code for that function anywhere.

reuvenlax commented 1 year ago

AFAIK the bug is that Beam did not infer a Coder for the toTableRow PCollection so the user needs to specify a Coder here with setCoder. However this should have caused an error on job submission - the fact that it wasn't might be a bug in the portable job-submission codepath.