apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[BUG] Example from Hudi Quick start doesnt work! #9141

Closed chandu-1101 closed 1 year ago

chandu-1101 commented 1 year ago

Describe the problem you faced I am trying to merge CDC json data into snapshot. For this I first took the dataframe from existing parquet (snapshot folder) and tried to write to s3 in hudi format. I get the below error.

A clear and concise description of the problem.

  1. I am running in spark shell with 3 executors ; each with 3GB memory, 1core. For driver: 1core. 1gb memory.
  2. Below is the code with the markup where its failing.

To Reproduce

import org.apache.hudi.QuickstartUtils
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.Function
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import java.util
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.SaveMode.Append
import org.apache.spark.sql.SaveMode.Overwrite

    val snapshotDf = Application.spark().read.parquet("s3://bucket/snapshots-test/dbdump/_bid_9223370348443853913/")
    val cdcSchema = SparkUtils.getSchema("s3://bucket/schemas/dbdump-schema.json")
    val cdcDf = Application.spark().read.schema(cdcSchema).json("s3://bucket/inputs/dbdump/")
    /* done */

    /* merge them */
    snapshotDf.registerTempTable("snapshot");
    val snapshotDf2 = Application.spark().sql("select * from snapshot where cdc_oid is not null and cdc_oid !='' ")
    val snapshotDf3 = snapshotDf2.withColumn("hash", lit(col("cdc_oid").hashCode() %1000) )

/* BELOW is taken from Quick Start guide of HUDI : https://hudi.apache.org/docs/quick-start-guide */
    snapshotDf3.write.format("hudi").options(QuickstartUtils.getQuickstartWriteConfigs())
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "cdc_oid")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "hash")
      .option(TBL_NAME.key(), "GE11")
      .mode(Overwrite)
      .save("s3://bucket/snapshots-hudi/ge11/snapshot");

Steps to reproduce the behavior:

  1. Run the above program

Expected behavior

  1. The hudi table should have been created from the snapshot parquet files
  2. The merge should have happened from CDC , but before this itself things failed

Stacktrace

07-07 14:11:10  WARN DAGScheduler: Broadcasting large task binary with size 1033.6 KiB
07-07 14:12:24  ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:148)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
chandu-1101 commented 1 year ago

Any update? is the above explanation missing something important? If i modify the above program to the below, the program returns in few seconds (the parquet file in the src is 40GB). When i go to the destination i see some .hoodi folders nothing more.

    snapshotDf3.write.format("hudi")
//      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "cdc_oid")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "hash")
      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.name())
      .option(HoodieBootstrapConfig.BASE_PATH.key(), "s3://bucket/snapshots-hudi/ge11/snapshot")
//      .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
      .option(TABLE_NAME, "GE11")
      .mode(SaveMode.Overwrite)
      .save("s3://bucket/snapshots-hudi/ge11/snapshot");
ad1happy2go commented 1 year ago

@chandu-1101 The above code snippet only should work. Can you paste the full stack trace please?

ad1happy2go commented 1 year ago

Tried this and it is working good. Can you try it?

import org.apache.hudi.QuickstartUtils
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.Function
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import java.util
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.SaveMode.Append
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.hudi.DataSourceWriteOptions

val snapshotDf = spark.read.parquet("<Parquet Path>")

snapshotDf.registerTempTable("snapshot");
val snapshotDf2 = spark.sql("select * from snapshot where invoiceid is not null and invoiceid !='' ")

val snapshotDf3 = snapshotDf2.withColumn("hash", lit(col("invoiceid").hashCode() %10) )

snapshotDf3.write.format("hudi").options(QuickstartUtils.getQuickstartWriteConfigs())
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "orderdate")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "invoiceid")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "hash")
.option(TBL_NAME.key(), "GE11")
.mode(Overwrite)
.save("file:///tmp/issue_9141");
chandu-1101 commented 1 year ago

The. above is the full stacktrace. Did a fresh run and rewriting it again here. Can these be the reason forthe failure?

  1. my src parquet file is 40GB ; I reran with val snapshotDf3 = snapshotDf2.withColumn("hash", (hash(col("cdc_oid")) %50) ) --> meaning 100 partitions
  2. My partition is based out of hash on primary key; max of 100?
  3. Executor mem is 3G, driver 1G, executor cores 1, driver 1?
  4. Each row is heavy ~ 10kB - 20kB
07-10 12:19:14 INBRUS_645d0a45cea0fd64c95d773c_bid_9223370348308073967__merge_ja WARN DAGScheduler: Broadcasting large task binary with size 1053.0 KiB
07-10 12:19:59 INBRUS_645d0a45cea0fd64c95d773c_bid_9223370348308073967__merge_ja ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:148)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 59 elided
ad1happy2go commented 1 year ago

@chandu-1101

chandu-1101 commented 1 year ago

@ ad1happy2go thank you for the reply. I will check these and get back.

chandu-1101 commented 1 year ago

Hi,

After trimming down the data files etc the issue is still reproducable.

  1. parquet file has complex data structures ; during the bulk insert we are inserting only 4 rows into hudi table
  2. same with CDC (schema file below) ; during merge CDC has only 10 rows. The diff between the snapshot, CDC schemas is only the position of cdc_pk cdc_oid (in one its in the start , while in other they are in the end). Few (4-5 columns in CDC are marked in string, while the same in snapshot schema are marked bool. I thought hudi does some schema evolution.
  3. I am getting error (below) during the merge of CDC data into the hudi table
  4. ran on spark on EMR

code

  val sess = Application.spark();
    /* get snapshot df; cdc df  */
      val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
      val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/ge11-schema.json")
      val cdcDf = sess.read.schema(cdcSchema1).json("s3://bucket/inputs-test/ge11-drop/*")
    /* done */

    /* merge them */
    snapshotDf.createOrReplaceTempView("snapshot")
    val snapshotDf2 = snapshotDf.limit(4).withColumn("cdc_pk",lit("0"))

    snapshotDf2.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.BULK_INSERT.name())
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
      .option(HoodieWriteConfig.TABLE_NAME,"GE11")
      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.name())
      .option(HoodieBootstrapConfig.BASE_PATH.key(), "s3://bucket/snapshots-hudi/ge11-drop/snapshot")
      .option("hoodie.datasource.write.table.type","COPY_ON_WRITE")
      .mode(SaveMode.Overwrite)
      .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot");

    cdcDf.createOrReplaceTempView("cdc")
    val _cdcDf = sess.sql("select * from cdc where _id.oid is not null and _id.oid !='' limit 10 ")
    _cdcDf.createOrReplaceTempView("_cdc");
    _cdcDf.write.format("hudi")
      .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "cdc_pk")
      .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_id.oid")
      .option("hoodie.datasource.write.operation", "upsert")
      .option(TBL_NAME.key(), "GE11")
      .mode(SaveMode.Append)
      .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot"); // <<<<<< ERROR here
    /* done */

ERROR


Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1274)
  at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:721)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:350)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 59 elided
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
  at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
  at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
  at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
  at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
  at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
  at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:138)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
  at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:166)
  at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
  at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
  at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
  ... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
  at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:164)
  ... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
  ... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
  at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
  at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
  at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
  at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
  at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  ... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
  at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
  at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file s3://bucket/snapshots-hudi/ge11-drop/snapshot/370cddda-d0f5-415a-8338-55135a49f3cf-0_0-337-0_20230711122023911.parquet
  at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
  at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
  at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
  at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
  ... 8 more
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.avro.AvroConverters$FieldUTF8Converter
  at org.apache.parquet.io.api.PrimitiveConverter.addBoolean(PrimitiveConverter.java:77)
  at org.apache.parquet.column.impl.ColumnReaderBase$2$5.writeValue(ColumnReaderBase.java:362)
  at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
  at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
  at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
  at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
  ... 11 more

Schemas:

https://www.diffchecker.com/a2xRibWv/

chandu-1101 commented 1 year ago

In addition to my immediate previous post.. further..

When i change the scala code to the below (exactly like said in the quick start) the insert itself fails.

     val sess = Application.spark();
      val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
      val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/GE11-schema.json")
      val cdcDf = sess.read.schema(cdcSchema1).json("s3://bucket/inputs-test/ge11-drop/*")

    snapshotDf.createOrReplaceTempView("snapshot")
    val snapshotDf2 = snapshotDf.limit(4).withColumn("cdc_pk",lit("0"))

    snapshotDf2.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
      .option(HoodieWriteConfig.TABLE_NAME,"GE11")
      .mode(SaveMode.Overwrite)
      .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot");

i start the spark shell as follows

spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic  --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1    --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar

Exception (I am unable to get the basic insert working)

07-11 15:58:17 ${sys:config.appname} WARN DAGScheduler: Broadcasting large task binary with size 1032.2 KiB
07-11 15:58:18 ${sys:config.appname} ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:148)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 59 elided
chandu-1101 commented 1 year ago

I think i am able to narrow down the issue to an extent.

  1. I tried adding column by column, one by one to the select query and re ran the insert. At one column things broke.
  2. Then i adjusted the limit X in the sql query (code below) ; and after several iterations i find when limit 472 things break again! So something weird is happening with the data once the 471st row is crossed. What is it? I see the Json schema of 471st, 472nd rows similar (https://www.diffchecker.com/QRbXbzd5/)

What is strange to me is::

  1. As of now i am using spark-SQL (no hudi) to merge the snapshot (parquet file) with the cdc (json files) and since 1.5y this is going good --meaning existing schema is still intact.
  2. Then, why does it fail with Hudi? with an error?
  3. One more: if i change the code to add bulk insert, then this insert works (where is the code? previous posts, pl check). But the subsequent merge fails --why? i don't know!

spark-shell command

spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true"  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic  --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1    --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar

Code

    import org.apache.commons.lang3.ClassUtils.getCanonicalName
    import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
    import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
    import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType}
    import org.apache.hudi.common.table.HoodieTableConfig
    import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
    import org.apache.hudi.keygen.constant.KeyGeneratorOptions

    import java.util
    import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
    import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions.{col, hash, lit}
    import org.apache.hudi.QuickstartUtils._

    val sess = spark;
    val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
    snapshotDf.cache()
    snapshotDf.registerTempTable("snapshot")

    // 472 is the culprit
    val snapshotDf2 = sess.sql("select * from snapshot order by _id.oid limit 472 ")   
    snapshotDf2.registerTempTable("snapshot2") 
    val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs  from snapshot2 ")
    snapshotDf3.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
      .option(HoodieWriteConfig.TABLE_NAME, "GE11")
      .mode(SaveMode.Overwrite)
      .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot");
chandu-1101 commented 1 year ago

Any update? / pointers? why does this happen ?

ad1happy2go commented 1 year ago

@chandu-1101 In the command I see you using both --packages and --jars and that too with different versions

spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar

you should remove either spark bundle in --jars or dont give --packages.

chandu-1101 commented 1 year ago

Ok, will recheck. thank you for the update. Will post back.

chandu-1101 commented 1 year ago

Hi,

@ad1happy2go I get the same exception again. Below are the 4 variants I tried. Below is the full stacktrace and the full warn statements.

Is Hudi allergic to case of the columns? --meaning if I have 2 columns with same name but different case does HUDI fail?

spark shell command: With -jars taking my custom jar. --package referring to hudi.jar

spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true"  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic  --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1    --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar

spark shell command: With -jars taking my custom jar and hudi.jar

spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true"  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic     --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar

spark shell command: With -jars taking hudi.jar (no custom jar)

spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true"  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic     --jars /home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar

spark shell command: With only --package switch (no custom jar, no hudi jar)

spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true"  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 

WARNS I get when I run the code (same above code. exactly same.)

07-18 08:58:36 ${sys:config.appname} WARN HoodieSparkSqlWriter$: hoodie table at s3://bucket/snapshots-hudi/ge11-drop/snapshot already exists. Deleting existing data & overwriting with new data.
07-18 08:58:38 ${sys:config.appname} WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInWrite' instead.
07-18 08:58:38 ${sys:config.appname} WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInRead' instead.
07-18 08:58:38 ${sys:config.appname} WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.
07-18 08:58:38 ${sys:config.appname} WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInRead' instead.
07-18 08:58:39 ${sys:config.appname} WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
07-18 08:58:39 ${sys:config.appname} WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
07-18 08:58:41 ${sys:config.appname} WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
07-18 08:58:41 ${sys:config.appname} WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore hadoop@172.25.26.169
07-18 08:58:43 ${sys:config.appname} WARN HoodieBackedTableMetadata: Metadata table was not found at path s3://bucket/snapshots-hudi/ge11-drop/snapshot/.hoodie/metadata

Exception. Again the same. Please note that we have columns with same name but different case and hence the flags enabled in the spark shell command

07-18 08:58:43 ${sys:config.appname} WARN HoodieBackedTableMetadata: Metadata table was not found at path s3://p-crm-messaging-v2/snapshots-hudi/ge11-drop/snapshot/.hoodie/metadata
07-18 08:59:03 ${sys:config.appname} ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 49 elided

scala>

Kindly let me know if I can do something more to get this working?

chandu-1101 commented 1 year ago

To eliminate the column case sensitiveness issue I renamed all columns to static strings with index-es --Except for _id.oid , cdc_pk , and addressLog . Also, I printed the schema of addressLog and made sure none of the columns repeat. Yet, again the same excption.

NOTE: note that I am running on AWS EMR with only ganglia, Spark selected (no hive/ glue/ hudi are selected)

addressLog column schema

|-- addressLogs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- addressLines: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- createdDate: string (nullable = true)
 |    |    |-- fieldId: string (nullable = true)
 |    |    |-- isDerived: boolean (nullable = true)
 |    |    |-- latLong: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- locationIp: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- continentCode: string (nullable = true)
 |    |    |    |-- continentName: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- countryIsoCode: string (nullable = true)
 |    |    |    |-- latitude: string (nullable = true)
 |    |    |    |-- longitude: string (nullable = true)
 |    |    |    |-- postalCode: string (nullable = true)
 |    |    |    |-- registeredCountry: string (nullable = true)
 |    |    |    |-- registeredCountryIsoCode: string (nullable = true)
 |    |    |    |-- subDivisions: string (nullable = true)
 |    |    |    |-- subDivisionsIsoCode: string (nullable = true)
 |    |    |    |-- timeZone: string (nullable = true)
 |    |    |-- original: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |-- residentialType: string (nullable = true)
 |    |    |-- source: string (nullable = true)
 |    |    |-- sourceType: string (nullable = true)
 |    |    |-- standardized: boolean (nullable = true)
 |    |    |-- standardizedDate: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- updatedDate: string (nullable = true)
 |    |    |-- zipCode: string (nullable = true)

final code

    import org.apache.spark.sql.{Column, DataFrame}

    import org.apache.commons.lang3.ClassUtils.getCanonicalName
    import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
    import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
    import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType}
    import org.apache.hudi.common.table.HoodieTableConfig
    import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
    import org.apache.hudi.keygen.constant.KeyGeneratorOptions

    import java.util
    import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
    import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions.{col, hash, lit}
    import org.apache.hudi.QuickstartUtils._

    def renameColumnsWithIndex(df: DataFrame): DataFrame = {
      var df2 = df;
      val newColumnNames = (0 to df.columns.length - 1).map(i => s"_index_$i")
      println(newColumnNames)
      df.columns.zip(newColumnNames).map { 
        case (oldName, newName) => 
          if("cdc_pk".equals(oldName) || oldName.contains("_id") ||  oldName.contains("oid") || oldName.contains("addressLogs")  ) { 
            // do nothing  
          }else{ 
            println(oldName+" -> "+ newName); 
            df2 = df2.withColumnRenamed(oldName, newName) 
          }
      }.foldLeft(df2)((acc, df2) => acc)
    }

    val sess = spark;
    val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
    snapshotDf.cache()
    snapshotDf.registerTempTable("snapshot")

    // 472 is the culprit
    val snapshotDf2 = renameColumnsWithIndex(sess.sql("select * from snapshot order by _id.oid limit 472 ")  ) 
    snapshotDf2.registerTempTable("snapshot2") 
    // val snapshotDf2 = sess.sql("select _id, cdc_pk,eventId, additionalConfig, additionalFields, additionalRequestInfo, address, addressLogs, alertData,alertUrl,applySources,applyTransactionId,atsId, experience from snapshot ")
    val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs  from snapshot2 ")
    snapshotDf3.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
      .option(HoodieWriteConfig.TABLE_NAME, "GE11")
      .mode(SaveMode.Overwrite)
      .save("s3://buket/snapshots-hudi/ge11-drop/snapshot");

exception

07-18 10:24:07 ${sys:config.appname} ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 53 elided
chandu-1101 commented 1 year ago

Any contributor can help here? Any geek? The seemingly simple insert into HUDI fails!

ad1happy2go commented 1 year ago

Are you part of community slack. can we connect over there? You can ping me "Aditya Goenka",
This is a very simple case which is used by lot of users, I can connect and check your setup.

chandu-1101 commented 1 year ago

Hi,

I found the root cause. The data I had nested Jsons of arrays with null values. And when this is the case the write to hudi fails.

So, given that I have to merge some 500 collections across, now I should write code to recursively traverse every cell of the data frame and remove nulls or substitute with empty/ default values --which is painful.

ad1happy2go commented 1 year ago

I connected with @chandu-1101 and write was failing for one of the record. As discussed, can you post the sample record for which it was failing. it will help us to fix if possible.

chandu-1101 commented 1 year ago

The fix is to change the below

"addressLines": [null],

to

"addressLines": [""],

in the source JSON.

code to reproduce the issue.

val df1 = spark.read.json(Seq(json1).toDS)
    import org.apache.spark.sql.{Column, DataFrame}
    import org.apache.commons.lang3.ClassUtils.getCanonicalName
    import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
    import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
    import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType}
    import org.apache.hudi.common.table.HoodieTableConfig
    import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
    import org.apache.hudi.keygen.constant.KeyGeneratorOptions

    import java.util
    import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
    import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions.{col, hash, lit}
    import org.apache.hudi.QuickstartUtils._

    df1.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
      .option(HoodieWriteConfig.TABLE_NAME, "GE11")
      .mode(SaveMode.Overwrite)
      .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot1");

The record that doesnt work

{
    "_id": {
        "oid": "1"
    },
    "cdc_pk": "45",
    "addressLogs": [{
        "createdDate": "2021-09-06T17:17:41.576Z",
        "fieldId": "eb4b6bd9-1fc0-4d38-b2d4-4cba87bb65a4",
        "isDerived": false,
        "location": "hyderabad (HC) PL",
        "original": {
            "location": "hyderabad (HC) PL"
        },
        "source": "p2",
        "standardized": false,
        "updatedDate": "2023-06-29T20:44:26.788Z"
    }, {
        "addressLines": [null],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-29T20:44:26.788Z",
        "fieldId": "1beefa35-7d08-4ca7-9fe1-88e59abb4c89",
        "isDerived": false,
        "location": "hyderabad, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, Srilanka"
        },
        "residentialType": "HOME",
        "source": "p2",
        "standardized": false,
        "updatedDate": "2023-06-29T20:44:26.788Z"
    }, {
        "addressLines": ["raddison,Business Park,cly B-"],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-29T20:44:26.788Z",
        "fieldId": "42720793-1920-4a35-9e3e-23f91e00341e",
        "isDerived": false,
        "location": "hyderabad, TN, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, TN, Srilanka",
            "state": "TN"
        },
        "residentialType": "WORK",
        "source": "p2",
        "standardized": false,
        "state": "TN",
        "updatedDate": "2023-06-29T20:44:26.788Z",
        "zipCode": "02-583"
    }, {
        "addressLines": [null],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-28T19:48:31.948Z",
        "fieldId": "7b56cdae-fbbc-4dd4-996e-6214b590db4a",
        "isDerived": false,
        "location": "hyderabad, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, Srilanka"
        },
        "residentialType": "HOME",
        "source": "p2",
        "standardized": false,
        "updatedDate": "2023-06-28T19:48:31.948Z"
    }, {
        "addressLines": ["raddison,Business Park,cly B-"],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-28T19:48:31.948Z",
        "fieldId": "27e67381-c688-4879-a0a4-319ae051dca8",
        "isDerived": false,
        "location": "hyderabad, TN, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, TN, Srilanka",
            "state": "TN"
        },
        "residentialType": "WORK",
        "source": "p2",
        "standardized": false,
        "state": "TN",
        "updatedDate": "2023-06-28T19:48:31.948Z",
        "zipCode": "02-583"
    }, {
        "addressLines": [null],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2021-11-26T14:49:38.305Z",
        "fieldId": "0352928b-1a42-40d7-81fd-cf711a2ecda1",
        "isDerived": false,
        "location": "hyderabad, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, Srilanka"
        },
        "residentialType": "HOME",
        "source": "p4",
        "standardized": false,
        "updatedDate": "2022-03-24T13:52:11.876Z"
    }, {
        "addressLines": ["raddison,Business Park,cly B-"],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2021-11-26T14:49:38.305Z",
        "fieldId": "058fc73f-559c-414c-8dfb-9ad830fbbdf3",
        "isDerived": false,
        "location": "hyderabad, TN, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, TN, Srilanka",
            "state": "TN"
        },
        "residentialType": "WORK",
        "source": "p4",
        "standardized": false,
        "state": "TN",
        "updatedDate": "2022-03-24T13:52:11.876Z",
        "zipCode": "02-583"
    }, {
        "createdDate": "2021-09-03T16:36:57.802Z",
        "fieldId": "c8c7e9c7-f7ad-4df4-90d2-5d07eeaa141f",
        "isDerived": false,
        "location": "hyderabad (HC) PL",
        "original": {
            "location": "hyderabad (HC) PL"
        },
        "source": "p4",
        "standardized": false,
        "updatedDate": "2022-03-24T13:52:11.876Z"
    }, {
        "country": "Srilanka",
        "createdDate": "2020-01-29T16:14:00.050Z",
        "fieldId": "4fbcc142-565d-4aa4-af00-6daa807dd951",
        "isDerived": false,
        "location": "Srilanka",
        "locationIp": {
            "city": "Amsterdam",
            "continentCode": "EU",
            "continentName": "Europe",
            "country": "Netherlands",
            "countryIsoCode": "NL",
            "latitude": "52.3759",
            "longitude": "4.8975",
            "postalCode": "1012",
            "registeredCountry": "United Kingdom",
            "registeredCountryIsoCode": "GB",
            "subDivisions": "North Holland",
            "subDivisionsIsoCode": "NH",
            "timeZone": "Europe/Amsterdam"
        },
        "original": {
            "country": "Srilanka",
            "location": "Srilanka"
        },
        "source": "p9",
        "standardized": false,
        "updatedDate": "2023-02-25T19:31:23.901Z"
    }, {
        "addressLines": [null],
        "city": "hyderabad",
        "country": "POL",
        "createdDate": "2021-08-10T16:34:32.662Z",
        "fieldId": "48318942-e268-4d66-8084-e19e302a73d7",
        "isDerived": false,
        "location": "hyderabad, POL",
        "original": {
            "city": "hyderabad",
            "country": "POL",
            "location": "hyderabad, POL"
        },
        "source": "p11",
        "standardized": false,
        "updatedDate": "2021-08-11T10:47:02.326Z"
    }]
}

The corrected record that works

{
    "_id": {
        "oid": "1"
    },
    "cdc_pk": "45",
    "addressLogs": [{
        "createdDate": "2021-09-06T17:17:41.576Z",
        "fieldId": "eb4b6bd9-1fc0-4d38-b2d4-4cba87bb65a4",
        "isDerived": false,
        "location": "hyderabad (HC) PL",
        "original": {
            "location": "hyderabad (HC) PL"
        },
        "source": "p2",
        "standardized": false,
        "updatedDate": "2023-06-29T20:44:26.788Z"
    }, {
        "addressLines": [""],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-29T20:44:26.788Z",
        "fieldId": "1beefa35-7d08-4ca7-9fe1-88e59abb4c89",
        "isDerived": false,
        "location": "hyderabad, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, Srilanka"
        },
        "residentialType": "HOME",
        "source": "p2",
        "standardized": false,
        "updatedDate": "2023-06-29T20:44:26.788Z"
    }, {
        "addressLines": ["raddison,Business Park,cly B-"],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-29T20:44:26.788Z",
        "fieldId": "42720793-1920-4a35-9e3e-23f91e00341e",
        "isDerived": false,
        "location": "hyderabad, TN, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, TN, Srilanka",
            "state": "TN"
        },
        "residentialType": "WORK",
        "source": "p2",
        "standardized": false,
        "state": "TN",
        "updatedDate": "2023-06-29T20:44:26.788Z",
        "zipCode": "02-583"
    }, {
        "addressLines": [""],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-28T19:48:31.948Z",
        "fieldId": "7b56cdae-fbbc-4dd4-996e-6214b590db4a",
        "isDerived": false,
        "location": "hyderabad, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, Srilanka"
        },
        "residentialType": "HOME",
        "source": "p2",
        "standardized": false,
        "updatedDate": "2023-06-28T19:48:31.948Z"
    }, {
        "addressLines": ["raddison,Business Park,cly B-"],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2023-06-28T19:48:31.948Z",
        "fieldId": "27e67381-c688-4879-a0a4-319ae051dca8",
        "isDerived": false,
        "location": "hyderabad, TN, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, TN, Srilanka",
            "state": "TN"
        },
        "residentialType": "WORK",
        "source": "p2",
        "standardized": false,
        "state": "TN",
        "updatedDate": "2023-06-28T19:48:31.948Z",
        "zipCode": "02-583"
    }, {
        "addressLines": [""],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2021-11-26T14:49:38.305Z",
        "fieldId": "0352928b-1a42-40d7-81fd-cf711a2ecda1",
        "isDerived": false,
        "location": "hyderabad, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, Srilanka"
        },
        "residentialType": "HOME",
        "source": "p4",
        "standardized": false,
        "updatedDate": "2022-03-24T13:52:11.876Z"
    }, {
        "addressLines": ["raddison,Business Park,cly B-"],
        "city": "hyderabad",
        "country": "Srilanka",
        "createdDate": "2021-11-26T14:49:38.305Z",
        "fieldId": "058fc73f-559c-414c-8dfb-9ad830fbbdf3",
        "isDerived": false,
        "location": "hyderabad, TN, Srilanka",
        "original": {
            "city": "hyderabad",
            "country": "Srilanka",
            "location": "hyderabad, TN, Srilanka",
            "state": "TN"
        },
        "residentialType": "WORK",
        "source": "p4",
        "standardized": false,
        "state": "TN",
        "updatedDate": "2022-03-24T13:52:11.876Z",
        "zipCode": "02-583"
    }, {
        "createdDate": "2021-09-03T16:36:57.802Z",
        "fieldId": "c8c7e9c7-f7ad-4df4-90d2-5d07eeaa141f",
        "isDerived": false,
        "location": "hyderabad (HC) PL",
        "original": {
            "location": "hyderabad (HC) PL"
        },
        "source": "p4",
        "standardized": false,
        "updatedDate": "2022-03-24T13:52:11.876Z"
    }, {
        "country": "Srilanka",
        "createdDate": "2020-01-29T16:14:00.050Z",
        "fieldId": "4fbcc142-565d-4aa4-af00-6daa807dd951",
        "isDerived": false,
        "location": "Srilanka",
        "locationIp": {
            "city": "Amsterdam",
            "continentCode": "EU",
            "continentName": "Europe",
            "country": "Netherlands",
            "countryIsoCode": "NL",
            "latitude": "52.3759",
            "longitude": "4.8975",
            "postalCode": "1012",
            "registeredCountry": "United Kingdom",
            "registeredCountryIsoCode": "GB",
            "subDivisions": "North Holland",
            "subDivisionsIsoCode": "NH",
            "timeZone": "Europe/Amsterdam"
        },
        "original": {
            "country": "Srilanka",
            "location": "Srilanka"
        },
        "source": "p9",
        "standardized": false,
        "updatedDate": "2023-02-25T19:31:23.901Z"
    }, {
        "addressLines": [""],
        "city": "hyderabad",
        "country": "POL",
        "createdDate": "2021-08-10T16:34:32.662Z",
        "fieldId": "48318942-e268-4d66-8084-e19e302a73d7",
        "isDerived": false,
        "location": "hyderabad, POL",
        "original": {
            "city": "hyderabad",
            "country": "POL",
            "location": "hyderabad, POL"
        },
        "source": "p11",
        "standardized": false,
        "updatedDate": "2021-08-11T10:47:02.326Z"
    }]
}
chandu-1101 commented 1 year ago

Hi,

Just wanted to know if this bug is planned to be fixed?

ad1happy2go commented 1 year ago

@chandu-1101 Created a critical JIRA for this - https://issues.apache.org/jira/browse/HUDI-6589 We will take this up soon.

ad1happy2go commented 1 year ago

@chandu-1101 Upon further investigation and debugging, it has been determined that to address the issue related to Avro-parquet compatibility and allow arrays with null elements, you need to set the Spark configuration parameter spark.hadoop.parquet.avro.write-old-list-structure to false.

This configuration parameter controls the behavior of how Avro arrays with null elements are written to Parquet format. By default, Avro arrays with null elements are written in a way that preserves their internal structure, which can cause compatibility problems with certain tools. By setting spark.hadoop.parquet.avro.write-old-list-structure to false, you enable support for arrays with null elements and ensure they are handled correctly during the write process.

This was not a Hudi issue. I was able to insert the record you pasted by just setting this --conf 'spark.hadoop.parquet.avro.write-old-list-structure=false

chandu-1101 commented 1 year ago

Wow! Wonderful. Thank you once again. I will put the flag check and get back.

chandu-1101 commented 1 year ago

Working as expected thank you.