apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.42k stars 2.43k forks source link

[SUPPORT] org.apache.avro.SchemaParseException: Can't redefine: array When there are Top level variables , Struct and Array[struct] (no complex datatype within array[struct]) #7717

Open abhishekshenoy opened 1 year ago

abhishekshenoy commented 1 year ago

Describe the problem you faced

When storing a data structure with the following layout into a copy-on-write table:

root
 |-- personDetails: struct (nullable = true)
 |    |-- id: integer (nullable = false)
 |-- idInfo: struct (nullable = true)
 |    |-- adhaarId: integer (nullable = false)
 |-- addressInfo: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- addressId: integer (nullable = false)
 |-- employmentInfo: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- employmenCd: integer (nullable = false)
 |-- src_load_ts: timestamp (nullable = false)
 |-- load_ts: timestamp (nullable = false)
 |-- load_dt: date (nullable = false)

the first write will succeed, but then subsequent writes will fail with the error included in the stacktrace.

To Reproduce

Steps to reproduce the behavior:

  case class Person(personDetails: PersonDetails,
                    idInfo: IdInfo,
                    addressInfo: Array[AddressInfo] = Array.empty[AddressInfo],
                    employmentInfo: Array[EmploymentInfo] = Array.empty[EmploymentInfo])

  case class PersonDetails(id: Int)

  case class IdInfo(adhaarId: Int)

  case class AddressInfo(addressId: Int)

  case class EmploymentInfo(employmenCd: Int)

  def maskedParquetBugTest(spark: SparkSession): Unit = {

    import spark.implicits._

    val personDetails1 = PersonDetails(1)
    val idInfo1 = IdInfo(1)
    val addressInfo1 = AddressInfo(1)
    val employmentInfo1 = EmploymentInfo(1)

    val item1 = Person(personDetails1, idInfo1, Array(addressInfo1), Array(employmentInfo1))
    val parquetBugDs = Seq(item1).toDF()
      .withColumn("src_load_ts", current_timestamp())
      .withColumn("load_ts", timestampInCst).withColumn("load_dt", to_date(col("load_ts")))

    parquetBugDs.printSchema()

    writeHudi(parquetBugDs, "parquet_bug_ds",
      "load_dt",
      "personDetails.id",
      "src_load_ts")
  }

  def writeHudi(ds: DataFrame, tableName: String, partitionPath: String, recordKey: String, precombineKey: String): Unit = {

    val hoodieConfigs: util.Map[String, String] = new java.util.HashMap[String, String]
    hoodieConfigs.put("hoodie.table.name", tableName)
    hoodieConfigs.put("hoodie.datasource.write.keygenerator.class", classOf[SimpleKeyGenerator].getName)
    hoodieConfigs.put("hoodie.datasource.write.partitionpath.field", partitionPath)
    hoodieConfigs.put("hoodie.datasource.write.recordkey.field", recordKey)
    hoodieConfigs.put("hoodie.datasource.write.precombine.field", precombineKey)
    hoodieConfigs.put("hoodie.payload.ordering.field", precombineKey)
    hoodieConfigs.put("hoodie.index.type", "GLOBAL_SIMPLE")
    hoodieConfigs.put("hoodie.insert.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.upsert.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.delete.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.simple.index.update.partition.path", "false")
    hoodieConfigs.put("hoodie.datasource.write.payload.class", classOf[DefaultHoodieRecordPayload].getName)
    hoodieConfigs.put("hoodie.datasource.write.hive_style_partitioning", "false")
    hoodieConfigs.put("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL)
    hoodieConfigs.put("hoodie.datasource.write.row.writer.enable", "true")
    hoodieConfigs.put("hoodie.combine.before.upsert", "true")
    hoodieConfigs.put("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled", "true")
    hoodieConfigs.put("hoodie.schema.on.read.enable", "true")
    hoodieConfigs.put("hoodie.datasource.write.reconcile.schema", "true")
    hoodieConfigs.put("hoodie.datasource.write.operation", "upsert")

    ds.toDF().write.format("hudi").
      options(hoodieConfigs).
      mode("append").
      save(s"/tmp/data/hudi/$tableName")
  }

  maskedParquetBugTest(spark)

  maskedParquetBugTest(spark)

Expected behavior

The second write succeeds.

Environment Description

Hudi version (hudi-spark3.1-bundle_2.12) : 0.12.2 , 0.12.1, 0.12.0

Spark version : 3.1.3

Hive version : -

Hadoop version : -

Storage (HDFS/S3/GCS..) : Local storage

Running on Docker? (yes/no) : No

Stack Trace

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:693)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:345)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at com.test.run.hudi.upsert.complex.utils.HudiParquetBugTest$.writeHudi(HudiParquetBugTest.scala:99)
    at com.test.run.hudi.upsert.complex.utils.HudiParquetBugTest$.maskedParquetBugTest(HudiParquetBugTest.scala:68)
    at com.test.run.hudi.upsert.complex.utils.HudiParquetBugTest$.delayedEndpoint$com$walmart$hnw$datafoundations$techmod$ingestion$utils$HudiParquetBugTest$1(HudiParquetBugTest.scala:32)
    at com.test.run.hudi.upsert.complex.utils.HudiParquetBugTest$delayedInit$body.apply(HudiParquetBugTest.scala:14)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at com.test.run.hudi.upsert.complex.utils.HudiParquetBugTest$.main(HudiParquetBugTest.scala:14)
    at com.test.run.hudi.upsert.complex.utils.HudiParquetBugTest.main(HudiParquetBugTest.scala)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: array
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:166)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
    ... 28 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: array
    at org.apache.avro.Schema$Names.put(Schema.java:1550)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:813)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:975)
    at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1137)
    at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1242)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1003)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:987)
    at org.apache.avro.Schema.toString(Schema.java:426)
    at org.apache.avro.Schema.toString(Schema.java:398)
    at org.apache.avro.Schema.toString(Schema.java:389)
    at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:69)
    at org.apache.hudi.io.storage.HoodieParquetReader.getRecordIterator(HoodieParquetReader.java:69)
    at org.apache.hudi.io.storage.HoodieFileReader.getRecordIterator(HoodieFileReader.java:43)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
danny0405 commented 1 year ago

@jonvex Can you take a look at this issue?

jonvex commented 1 year ago

Ok, so this seems to be the same issue as https://github.com/apache/hudi/issues/2657

Here is my runbook that you can use in spark shell. I used Spark 3.1.3 and Hudi 0.12.2 :

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.DataFrame
import java.util.Map
import java.util.HashMap
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import spark.implicits._

  val timestampInCst = current_timestamp()

  case class PersonDetails(id: Int)

  case class IdInfo(adhaarId: Int)

  case class AddressInfo(addressId: Int)

  //case class EmploymentInfo(employmenCd: Int)

//    case class Person(personDetails: PersonDetails,
//                     idInfo: IdInfo,
//                     addressInfo: Array[AddressInfo] = Array.empty[AddressInfo],
//                     employmentInfo: Array[EmploymentInfo] = Array.empty[EmploymentInfo])

    case class Person(personDetails: PersonDetails,
                idInfo: IdInfo,
                addressInfo: Array[AddressInfo] = Array.empty[AddressInfo])

  def writeHudi(ds: DataFrame, tableName: String, partitionPath: String, recordKey: String, precombineKey: String, basePath: String): Unit = {

    val hoodieConfigs: java.util.Map[String, String] = new java.util.HashMap[String, String]
    hoodieConfigs.put("hoodie.table.name", tableName)
    hoodieConfigs.put("hoodie.datasource.write.keygenerator.class", classOf[SimpleKeyGenerator].getName)
    hoodieConfigs.put("hoodie.datasource.write.partitionpath.field", partitionPath)
    hoodieConfigs.put("hoodie.datasource.write.recordkey.field", recordKey)
    hoodieConfigs.put("hoodie.datasource.write.precombine.field", precombineKey)
    hoodieConfigs.put("hoodie.payload.ordering.field", precombineKey)
    hoodieConfigs.put("hoodie.index.type", "GLOBAL_SIMPLE")
    hoodieConfigs.put("hoodie.insert.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.upsert.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.delete.shuffle.parallelism", "1")
    hoodieConfigs.put("hoodie.simple.index.update.partition.path", "false")
    hoodieConfigs.put("hoodie.datasource.write.payload.class", classOf[DefaultHoodieRecordPayload].getName)
    hoodieConfigs.put("hoodie.datasource.write.hive_style_partitioning", "false")
    hoodieConfigs.put("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL)
    hoodieConfigs.put("hoodie.datasource.write.row.writer.enable", "true")
    hoodieConfigs.put("hoodie.combine.before.upsert", "true")
    hoodieConfigs.put("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled", "true")
    hoodieConfigs.put("hoodie.schema.on.read.enable", "true")
    hoodieConfigs.put("hoodie.datasource.write.reconcile.schema", "true")
    hoodieConfigs.put("hoodie.datasource.write.operation", "upsert")

    ds.toDF().write.format("hudi").
      options(hoodieConfigs).
      mode("append").
      save(basePath)
  }

  def maskedParquetBugTest(spark: SparkSession, tableName: String, basePath: String): Unit = {

    import spark.implicits._

    val personDetails1 = PersonDetails(1)
    val idInfo1 = IdInfo(1)
    val addressInfo1 = AddressInfo(1)
    //val employmentInfo1 = EmploymentInfo(1)

    //val item1 = Person(personDetails1, idInfo1, Array(addressInfo1), Array(employmentInfo1))
    val item1 = Person(personDetails1, idInfo1, Array(addressInfo1))
    val parquetBugDs = Seq(item1).toDF().
      withColumn("src_load_ts", current_timestamp()).
      withColumn("load_ts", timestampInCst).
      withColumn("load_dt", to_date(col("load_ts")))

    parquetBugDs.printSchema()

    writeHudi(parquetBugDs, tableName, "load_dt", "personDetails.id", "src_load_ts", basePath)

    spark.read.format("hudi").load(basePath).show(false)
  }

val tableName = "tbl11"
val basePath = s"/tmp/issue7717/$tableName"
maskedParquetBugTest(spark, tableName, basePath)
maskedParquetBugTest(spark, tableName, basePath)
danny0405 commented 1 year ago

So cc @jonvex , you mean the Spark SQL still fails for the latest master code and things are not like what it mentioned in #2657 because it said that only Hive query fails.

jonvex commented 1 year ago

Yes. It is not exactly the same issue. What I meant is I think the root cause is the same, and it can be solved by upgrading parquet-avro.

ad1happy2go commented 1 year ago

@abhishekshenoy As we were table to reproduce this error with spark version spark-3.1.3 but same code works with spark-3.2. The Hudi version used for both is same.

So as @jonvex mentioned, it's the parquet-avro lib version issue. Do you still have an issue or Can we close this issue

voonhous commented 1 year ago

Detailed explanation of this error can be found here:

https://github.com/apache/hudi/issues/6849#issuecomment-1661734683

Jonathanrodrigr12 commented 8 months ago

Hi guys, i have the same problem but, i am use the HoodieMultiTableStreamer Description I have a lot parquet files, all of them have this struct image

but the first time when i run the job in emr serverless the data is saved, but in the second attemp i had this error

Caused by: org.apache.avro.SchemaParseException: Can't redefine: value at org.apache.avro.Schema$Names.put(Schema.java:1586) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:844) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1011) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1173) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema.toString(Schema.java:433) at org.apache.avro.Schema.toString(Schema.java:405) at org.apache.avro.Schema.toString(Schema.java:396) at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:73)

Expected behavior The second write succeeds.

Environment Description Hudi hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar Spark version : 3.4.1 EMR: 6.15.0 Stack Trace

org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375) at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: value at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:387) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:369) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335) ... 30 more Caused by: org.apache.avro.SchemaParseException: Can't redefine: value at org.apache.avro.Schema$Names.put(Schema.java:1586) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:844) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1011) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1173) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) at org.apache.avro.Schema.toString(Schema.java:433) at org.apache.avro.Schema.toString(Schema.java:405) at org.apache.avro.Schema.toString(Schema.java:396) at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:73) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:162) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:94) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getRecordIterator(HoodieAvroParquetReader.java:73) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:126) ... 33 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2974) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2910) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2909) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2909) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1263) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1263) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.15.jar:?] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1263) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3173) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3112) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3101) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2271) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2366) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1172) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.withScope(RDD.scala:405) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.fold(RDD.scala:1166) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.DoubleRDDFunctions.$anonfun$sum$1(DoubleRDDFunctions.scala:36) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at scala.runtime.java8.JFunction0$mcD$sp.apply(JFunction0$mcD$sp.java:23) ~[scala-library-2.12.15.jar:?] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.withScope(RDD.scala:405) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:36) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.hudi.utilities.streamer.StreamSync.writeToSink(StreamSync.java:804) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:446) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:840) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:456) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:281) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_392] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_392] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_392] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_392] at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1066) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1158) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1167) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_392] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_392] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392] Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: value at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:387) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:369) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_392] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_392] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392] Caused by: org.apache.avro.SchemaParseException: Can't redefine: value at org.apache.avro.Schema$Names.put(Schema.java:1586) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:844) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1011) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1173) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1278) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1039) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:1023) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema.toString(Schema.java:433) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema.toString(Schema.java:405) ~[avro-1.11.1.jar:1.11.1] at org.apache.avro.Schema.toString(Schema.java:396) ~[avro-1.11.1.jar:1.11.1] at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:73) ~[org.apache.parquet_parquet-avro-1.12.3.jar:1.12.3] at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:162) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:94) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.io.storage.HoodieAvroParquetReader.getRecordIterator(HoodieAvroParquetReader.java:73) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:126) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:387) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:369) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259) ~[hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0] at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_392] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_392]

ad1happy2go commented 8 months ago

What hudi and spark version you are using @Jonathanrodrigr12

Jonathanrodrigr12 commented 8 months ago

i am use Spark version : 3.4.1 and hudi 0.14.0, also i am working with emr serverless with this image https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-6150-release.html

junkri commented 7 months ago

@Jonathanrodrigr12 I think I ran into the same problem as you, I can see on your screenshot that you have the field called "value" defined multiple times, once as a decimal and once as a struct. I've just raised a separate issue covering this, please check out if it covers your situation as well: https://github.com/apache/hudi/issues/10983

ad1happy2go commented 7 months ago

@Jonathanrodrigr12 Did you also had multiple "value" column across structs? This may be same as issue raised by @junkri https://github.com/apache/hudi/issues/10983 and not this original issue.