apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.33k stars 2.42k forks source link

[SUPPORT] org.apache.avro.SchemaParseException: Can't redefine decimal field #10983

Open junkri opened 5 months ago

junkri commented 5 months ago

Describe the problem you faced

When using Decimal types, I ran into a problem where Hudi cannot write into a non-empty table, getting exception as Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: <field> I can trigger this error in 2 ways:

To Reproduce I created a small runnable github project with 2 small examples to trigger this error: https://github.com/junkri/hudi-cant-redefine-field-demo You can run the examples with maven or from any IDE as well.

spark.sql(s"""
           create table trick(
            one struct<tricky decimal(10,2)>,
            two struct<tricky decimal(19,6)>
            )
            using hudi
            location '$location'
           """)

  spark.sql("""
             insert into trick (one, two)
             values (
               named_struct('tricky', 1.2),
               named_struct('tricky', 3.4)
             )
             """) // works fine

  spark.sql("""
             insert into trick (one, two)
             values (
               named_struct('tricky', 5.6),
               named_struct('tricky', 7.8)
             )
             """) // org.apache.avro.SchemaParseException: Can't redefine: tricky

Expected behavior I expect that I can use decimal fields in different structures without an issue

Environment Description I use AWS EMR serverless mainly, so I chose the versions from the last EMR 6 environment

Additional context

I am aware of https://github.com/apache/hudi/issues/7717, but here I don't use very complex structures, and in my case decimal fields cause the issue. I tried to force to update the parquet-avro library in my project, but it didn't help.

I tried to debug into Hudi, and I saw that when it reads back from Parquet and converts things into Avro, the decimal fields are created as fixed Avro type, which has an empty namespace attribute! I guess that means the decimal fields can be defined once in the whole avro schema, and re-used later, but because of the different precision/scale settings for my decimal fields (having the same name), the size attribute of fixed field has to be different, so that is when we can't re-define a field.

In our projects we use Kafka as input source, so we define decimal fields with bytes Avro type (and not the fixed one), so we use something like {"type": "bytes", "logicalType": "decimal", "precision": 19, "scale": 6}. Maybe the parquet-avro library should use it as well?

Stacktrace

Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: tricky
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:387)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:369)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)
    ... 30 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: tricky
    at org.apache.avro.Schema$Names.put(Schema.java:1586)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:844)
    at org.apache.avro.Schema$FixedSchema.toJson(Schema.java:1315)
    at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023)
    at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023)
    at org.apache.avro.Schema.toString(Schema.java:433)
    at org.apache.avro.Schema.toString(Schema.java:405)
    at org.apache.avro.Schema.toString(Schema.java:396)
    at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:77)
    at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:162)
    at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:94)
    at org.apache.hudi.io.storage.HoodieAvroParquetReader.getRecordIterator(HoodieAvroParquetReader.java:73)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:126)
    ... 33 more
ad1happy2go commented 5 months ago

Thanks @junkri for raising this . We will look into this.

ad1happy2go commented 5 months ago

@junkri This looks different than https://github.com/apache/hudi/issues/7717 as that is fixed with later version of spark in which its dependency included this fix (https://issues.apache.org/jira/browse/PARQUET-1441) .

It is still failing with spark 3.4 and hudi 0.14.1. Created a JIRA to track a fix on this. https://issues.apache.org/jira/browse/HUDI-7602

junkri commented 5 months ago

thank you very much for creating a Jira issue on this. I also found out that the same error is triggered when we have a decimal and a struct field with the same name, so this also causes issues:

 spark.sql(s"""
           create table trick(
            tricky struct<tricky decimal(10,2)>
            )
            using hudi
            location '$location'
           """)

  spark.sql("""
             insert into trick
             values (
               named_struct('tricky', 1.2)
             )
             """) // works fine

  spark.sql("""
             insert into trick
             values (
               named_struct('tricky', 3.4)
             )
             """) // org.apache.avro.SchemaParseException: Can't redefine: tricky

I suspect this also happens as the decimal is represented as a fixed type with an empty namespace during parquet->avro conversion