apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.86k stars 4.25k forks source link

[Bug]: org.apache.avro.UnresolvedUnionException: Not in union after updating schema-registry #30647

Open vinicius0197 opened 7 months ago

vinicius0197 commented 7 months ago

What happened?

We have a Dataflow pipeline running Apache Beam 2.48.0. This pipeline consumes data from a Kafka topic using KafkaIO with schema-registry in Avro. Yesterday we've updated the schema for this topic (added a new field) without issues. Today (some 12hrs after the change) we've started receiving errors on our Dataflow pipeline. The stacktrace is below:

Stacktrace

Error message from worker: java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.kafka.KafkaRecord@a6e3f803}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(KafkaRecordCoder(org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder@8f8a9b01,org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder@ffb6aacf))'.
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:295)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:286)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:384)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1404)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:154)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1044)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.avro.UnresolvedUnionException: Not in union ["null","long"]: mytable

Looking into the org.apache.avro.UnresolvedUnionException: Not in union ["null","long"]: mytable line, what I find interesting is that the schema registry looks like this (I'm showing just the last few lines of the schema):

 "tags": null,
  "__table": {
    "string": "mytable"
  },
  "__lsn": {
    "long": 1683624739725992
  },

The tags fields is the new field that was added to the schema registry yesterday. Looks like the AvroCoder tried encoding the __table field, which is of type string (mytable) to the type of the next field in the schema (which is a long).

This is not the first time we've added a new field to a schema in schema-registry, and we didn't have issues before.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

johnjcasey commented 1 week ago

This looks to me like an issue with the coder being used not updating to reflect the new field. This would make sense to me, as I don't believe beam supports dynamically changing coders.

I'm a bit surprised this has worked cleanly in the past. Can you provide an example of a change that has worked?