apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

"Dead letter" handling for problem rows in BigQueryIO Storage Write API #21247

Open damccorm opened 2 years ago

damccorm commented 2 years ago

A single invalid row causes the BigQueryIO transform and the whole pipeline to fail. The desired behavior would be to allow control of the error handling - either fail on any validation failure (current behavior) or return the list of failed records through the WriteResult. 

There are two places where the exception occurs - Json to protobuf conversion and the BigQuery backend. 

Example of the exception caused by the conversion:


io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched with BigQuery field at
D586b3f9a_1543_4dbe_87ff_ef786d6803c2.bytes_sent, the proto field type string, BigQuery field type INTEGER
Entity: projects/event-processing-demo/datasets/bigquery_io/tables/events/streams/Cic2MzUyMTYxYy0wMDAwLTI2MjktOGVjYy1mNDAzMDQ1ZWY5Y2U6czI

Example of the exception caused by the BigQuery backend: 


io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Field dst_ip: STRING(15) has maximum length 15 but
got a value with length 54 Entity: projects/event-processing-demo/datasets/bigquery_io/tables/events/streams/CiQ2MzRkOGM5Mi0wMDAwLTI2MjktOGVjYy1mNDAzMDQ1ZWY5Y2U

Imported from Jira BEAM-13158. Original Jira may contain additional context. Reported by: slilichenko.

Abacn commented 4 months ago

This issue claims a single problem row can cause the pipeline to fail, while there is an opposite issue #25233 noting failed rows get silently dropped.

I find both are true. In fact, at least since Beam 2.50.0, to current (2.57.0-SNAPSHOT), the pipeline can non-deterministicly fail or pass on invalid element that would cause write row fail.

In some trials, failed rows are passed to a "flattenErrors" which gives PCollection

In other trails, a bundle failed with error like

org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Append to stream projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA failed with Status Code INVALID_ARGUMENT. The stream may not exist.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1776)
    at org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:116)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:560)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Append to stream projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA failed with Status Code INVALID_ARGUMENT. The stream may not exist.
    at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:778)
    at org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:311)
    at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:965)
    at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:1113)
Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Cannot convert value to DATE, value: 6585321 on field date. Entity: projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
    at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82)
    at com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84)
    at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:148)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.census.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:814)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.census.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:494)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Cannot convert value to DATE, value: 6585321 on field date. Entity: projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA
    at io.grpc.Status.asRuntimeException(Status.java:533)
    ... 22 more