GoogleCloudPlatform / DataflowTemplates

Cloud Dataflow Google-provided templates for solving in-Cloud data tasks
https://cloud.google.com/dataflow/docs/guides/templates/provided-templates
Apache License 2.0
1.15k stars 971 forks source link

[Bug]: PubSub_Proto_to_BigQuery is broken in 2022-12-05-00_RC00 #538

Closed theyoprst closed 1 year ago

theyoprst commented 1 year ago

Related Template(s)

PubSub_Proto_to_BigQuery

What happened?

The new version of template generates exceptions which are endlessly retried, no output on BigQuery side is generated. Old version works fine.

New version is gs://dataflow-templates/2022-12-05-00_RC00/flex/PubSub_Proto_to_BigQuery Old version is gs://dataflow-templates/2022-11-16-00_RC00/flex/PubSub_Proto_to_BigQuery

Template parameters we are using are:

    "protoSchemaPath" = "gs://<cropped>"
    "tempLocation" = "gs://<cropped>"
    "stagingLocation" = "gs://<cropped>"
    "fullMessageName" = <cropped>
    "inputSubscription" = <cropped>
    "outputTopic" = <cropped>
    "outputTableSpec" = <cropped>
    "usePublicIps" = false

As long as I understand the traceback, the problem is with some time field, and the only one is served_timestamp which has type google.protobuf.Timestamp.

Beam Version

--Newer than 2.43.0--

Apache Beam SDK for Java 2.43.0

Relevant log output

Error message from worker: java.lang.IllegalArgumentException: Unable to encode element 'global_entity_id: "XX_XX"
served_timestamp {
  seconds: 1670943311
  nanos: 135642000
}
micro_cost: 1
timezone: "America/New_York"
session_id: "session-id"
hit_match_id: "<cropped>"
placement: "<cropped>"
campaign_id: "1"
device_id: "<cropped>"
type: "<cropped>"
advertiser_id: "<cropped>"
advertiser_type: "<cropped>"
' with coder 'SchemaCoder<Schema: Fields:
Field{name=error, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=3}}}}
Field{name=global_entity_id, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=1}}}}
Field{name=campaign_id, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=19}}}}
Field{name=device_id, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=20}}}}
Field{name=session_id, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=16}}}}
Field{name=hit_match_id, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=17}}}}
Field{name=placement, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=18}}}}
Field{name=micro_cost, description=, type=INT64 NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=14}}}}
Field{name=timezone, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=15}}}}
Field{name=type, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=22}}}}
Field{name=advertiser_id, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=23}}}}
Field{name=advertiser_type, description=, type=STRING NOT NULL, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=24}}}}
Field{name=served_timestamp, description=, type=LOGICAL_TYPE, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=10}}}}
Field{name=replayed_timestamp, description=, type=LOGICAL_TYPE, options={{beam:option:proto:meta:number=Option{type=INT32 NOT NULL, value=11}}}}
Encoding positions:
{global_entity_id=1, device_id=3, timezone=8, session_id=4, advertiser_type=11, error=0, micro_cost=7, type=9, advertiser_id=10, hit_match_id=5, replayed_timestamp=13, served_timestamp=12, placement=6, campaign_id=2}
Options:{{beam:option:proto:meta:type_name=Option{type=STRING NOT NULL, value=event.DisplayEvent}}}UUID: 44db6468-0cda-436d-994c-e746ffdc5981  UUID: 44db6468-0cda-436d-994c-e746ffdc5981 delegateCoder: org.apache.beam.sdk.coders.Coder$ByteBuddy$xjQgaOv4@7ac6b45c'.
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
        org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
        org.apache.beam.sdk.transforms.MapElements$MapWithFailures$MapFn.processElement(MapElements.java:337)
Caused by: java.lang.ClassCastException: class java.time.Instant cannot be cast to class org.apache.beam.sdk.values.Row (java.time.Instant is in module java.base of loader 'bootstrap'; org.apache.beam.sdk.values.Row is in unnamed module of loader 'app')
        org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
        org.apache.beam.sdk.schemas.SchemaCoderHelpers$LogicalTypeCoder.encode(SchemaCoderHelpers.java:89)
        org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:333)
        org.apache.beam.sdk.coders.Coder$ByteBuddy$xjQgaOv4.encode(Unknown Source)
        org.apache.beam.sdk.coders.Coder$ByteBuddy$xjQgaOv4.encode(Unknown Source)
        org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
        org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
        org.apache.beam.sdk.transforms.MapElements$MapWithFailures$MapFn.processElement(MapElements.java:337)
        org.apache.beam.sdk.transforms.MapElements$MapWithFailures$MapFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1450)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1125)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)
theyoprst commented 1 year ago

Amendment about Beam version:

Apache Beam SDK for Java 2.43.0

sgulbetekin commented 1 year ago

FYI - I just raised an internal bug with DataFlow team.

Abacn commented 1 year ago

This is caused by apache/beam#24870 and fix on Beam has be found.

bvolpato commented 1 year ago

A fix is coming up on 2.44.0. Thanks @ahmedabu98 and @Abacn.

After the release, can take a couple of days until we can validate + rollout a template with a fix. Please hold on and we'll keep you posted.

bvolpato commented 1 year ago

Beam 2.44.0 was released. This should be fixed already.