apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] Can't redefine array #11807

Open Ytimetravel opened 3 weeks ago

Ytimetravel commented 3 weeks ago

Dear community, I found an error in using Hudi. If we use the array<struct<>> type in the table.Afterwards, every time I add columns and then write , it throws an exception, The error message is as follows:

image

Has anyone encountered this issue? How should it be resolved?

Steps to reproduce the behavior: 1.create table CREATE TABLE IF NOT EXISTS db.table ( p_id bigint , record_items array<struct<name:string,value:int>> , edit_items array<struct<name:string,value:int>> , texts array<struct<text_content:string,text_style:string>> , p_date string ) using hudi PARTITIONED BY (p_date) options ( type = 'mor', primaryKey = 'p_id', payloadClass='org.apache.hudi.common.model.PartialUpdateAvroPayload',

'hoodie.bucket.index.num.buckets'='4' ); 2.insert into table db.table select 130928195078 as p_id, array(named_struct("name", "John", "value", 10), named_struct("name", "wangwu", "value", 5), named_struct("name", "Bob", "value", 7)) as record_items, array(named_struct("name", "zhangsan", "value", 10), named_struct("name", "Jane", "value", 5), named_struct("name", "lisi", "value", 7)) as edit_items, null as texts, '20240810' as p_date ; 3.alter table db.table add columns(is_new_col string); 4.insert into table db.table select 130928195078 as p_id, array(named_struct("name", "John", "value", 10), named_struct("name", "wangwu", "value", 5), named_struct("name", "Bob", "value", 7)) as record_items, array(named_struct("name", "zhangsan", "value", 10), named_struct("name", "Jane", "value", 5), named_struct("name", "lisi", "value", 7)) as edit_items, null as texts, 'newcol' as is_new_col, '20240810' as p_date ;

Environment Description

stack org.apache.spark.SparkException: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:394) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:394) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:400) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$2(BaseSparkCommitActionExecutor.java:310) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:873) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:873) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:344) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:344) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:355) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1184) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1158) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1093) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1158) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:884) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:355) at org.apache.spark.rdd.RDD.iterator(RDD.scala:306) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:344) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95) at org.apache.spark.scheduler.Task.run(Task.scala:124) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:959) Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: array at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:439) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:421) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:80) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:387) ... 31 more Caused by: org.apache.avro.SchemaParseException: Can't redefine: array at org.apache.avro.Schema$Names.put(Schema.java:1128) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema.toString(Schema.java:324) at org.apache.avro.Schema.toString(Schema.java:314) at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:69) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:162) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:94) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getRecordIterator(HoodieAvroParquetReader.java:73) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:126) ... 35 more

danny0405 commented 3 weeks ago

From the source code it looks like there are multiple fields named "array" in the schema, can you share with us the table creation schema after the alter table operation? Here is the logic I found in avro Schema.class:

    public Schema put(Name name, Schema schema) {
      if (containsKey(name))
        throw new SchemaParseException("Can't redefine: " + name);
      return super.put(name, schema);
    }
ad1happy2go commented 3 weeks ago

@Ytimetravel Is it possible to upgrade your spark version to 3.2 . This issue may be related to - https://issues.apache.org/jira/browse/PARQUET-1441

Ytimetravel commented 3 weeks ago

@danny0405 Sorry, I'm not sure if I understand correctly. The following image contains metadata related to the schema after alter column.

image
Ytimetravel commented 3 weeks ago

@ad1happy2go We're currently still using Spark 2 internally. Thank you very much for the link.I'll check if it's relevant.