apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

Schema Evolution: Missing column for previous records when new entry does not have the same while upsert. #5452

Closed santoshsb closed 2 years ago

santoshsb commented 2 years ago

Hi Team,

We are currently evaluating Hudi for our analytical use cases and as part of this exercise we are facing few issues with schema evolution and data loss. The current issue which we have encountered is while updating a record. We have currently inserted a single record with the following schema root |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

now when we insert the new data with the following schema

root |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

The update is successful but the schema is missing the
|-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true)

field. our expected behaviour was that after adding the second entry, the new column "multipleBirthBoolean" will be added to the overall schema and the previous column "maritalStatus" struct will be retained and will be null for the second entry. The final schema looks like this, root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

Basically when a new entry is added and it is missing a column from the destination schema the update is successful and the missing column vanishes from the previous entries. Let us know if we are missing any configuration options. We cannot control the schema as its defined by FHIR standards (https://www.hl7.org/fhir/patient.html#resource) most of the fields here are optional so the incoming data from our customers will be missing certain columns.

Environment Description

Thanks for the help.

yihua commented 2 years ago

@santoshsb could you post the Hudi write config used to write the table and the commands to reproduce the problem? @xiarixiaoyao could you provide some insights around schema evolution?

santoshsb commented 2 years ago

Thanks @yihua, here are the detailed spark shell commands we used

`./spark-shell --jars '/Users/balamats/work/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0-SNAPSHOT.jar' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few val orgString = """{"resourceType":"Patient","id":"4ad86a5c-926e-439b-9352-f8ac9ab780f1","lastUpdated":"2022-03-11T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd21481","gender":"male","birthDate":"1974-01-05","maritalStatus":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v3-MaritalStatus","code":"M","display":"M"}],"text":"M"}}"""

//Convert to dataframe val orgStringDf = spark.read.json(Seq(orgString).toDS)

//Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "patient_hudi", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated", DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true" )

//Write the orgStringDf to a Hudi table orgStringDf.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")

//Read the Hudi table val patienthudi = spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")

//Printschema patienthudi.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

//Select fields to verify patienthudi.select("id","gender","maritalStatus").show(false) +------------------------------------+------+---------------------------------------------------------------------+ |id |gender|maritalStatus | +------------------------------------+------+---------------------------------------------------------------------+ |4ad86a5c-926e-439b-9352-f8ac9ab780f1|male |{[{M, M, http://terminology.hl7.org/CodeSystem/v3-MaritalStatus}], M}| +------------------------------------+------+---------------------------------------------------------------------+

//Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data)

val updatedString = """{"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","gender":"female","birthDate":"2005-08-30","multipleBirthBoolean":true}"""

//Convert the new resource string into DF val updatedStringDf = spark.read.json(Seq(updatedString).toDS)

//Check the schema of the new resource that is being added updatedStringDf.printSchema root |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

//Upsert the new resource updatedStringDf.write .format("org.apache.hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")

//Read the Hudi table val patienthudi = spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")

//Print the schema after adding the new record patienthudi.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

//Select fields to verify patienthudi.select("id","gender","maritalStatus").show(false) org.apache.spark.sql.AnalysisException: cannot resolve 'maritalStatus' given input columns: [_hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, birthDate, gender, id, lastUpdated, multipleBirthBoolean, resourceType, source]; 'Project [id#130, gender#129, 'maritalStatus] +- Relation [_hoodie_commit_time#123,_hoodie_commit_seqno#124,_hoodie_record_key#125,_hoodie_partition_path#126,_hoodie_file_name#127,birthDate#128,gender#129,id#130,lastUpdated#131,multipleBirthBoolean#132,resourceType#133,source#134] parquet

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:214) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:323) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:214) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:181) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:161) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:175) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:182) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:205) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88) at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3734) at org.apache.spark.sql.Dataset.select(Dataset.scala:1454) at org.apache.spark.sql.Dataset.select(Dataset.scala:1471) ... 49 elided`

Our expectation after adding the second row was,

  1. The new column "multipleBirthBoolean" should have been added to the schema and would be null for the previous entry.
  2. The existing "maritalStatus" column present in the destination schema added by the first entry should be present after adding the second entry and should have been null for the second entry.

We might be missing some config or we feel that when we add a new entry it should contain all the columns present in the destination schema regardless if they are NULL they should be present, If we do need a uber schema we didn't find the spark code to convert our second dataframe "updatedStringDf" to add those columns with NULL values, basically reading the uber schema and merging it into "updatedStringDf" with NULL values. We did try these commands while creating the second dataframe

val updatedStringDf = spark.read.schema(patientHudi.schema).json(Seq(updatedString).toDS)

But than the new schema for the updatedStringDf misses the "multipleBirthBoolean" column present in the second entry.

root |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

Thanks for the help. Santosh

xiarixiaoyao commented 2 years ago

@santoshsb multipleBirthBoolean is a new column to be added, but How to determine its added position? is it added as the last column or somewhere else ?

If the above questions can be answered , i think i can help you to solve the problem

santoshsb commented 2 years ago

@xiarixiaoyao we are not concerned about the position as long as its there in the schema (either as the last column or somewhere else) along with all the existing columns.

xiarixiaoyao commented 2 years ago

@santoshsb pls use follow code to solve your problem

    def createNewDF(df: DataFrame, oldTableSchema: StructType): DataFrame = {
      val writeSchema = df.schema
      val neededSchema = {
        val neededFields = mutable.ListBuffer[StructField]()
        oldTableSchema.foreach(neededFields.append(_))
        writeSchema.filterNot { col =>
          oldTableSchema.exists(targetCol => SQLConf.get.resolver(targetCol.name, col.name))
        }.foreach(neededFields.append(_))
        StructType(neededFields)
      }

      val missingCols = {
        neededSchema.zipWithIndex.map { case (field, index) =>
          (index,  writeSchema.indexWhere(p => SQLConf.get.resolver(p.name, field.name)))
        }.toMap
      }

      val filledRdd = df.rdd.mapPartitions { iter =>
        iter.map { row =>
          val tmp = new Array[Any](neededSchema.length)
          for (i <- (0 to tmp.length - 1)) {
            val index = missingCols.getOrDefault(i, -1)
            tmp.update(i, if (index != -1) row.get(index) else null)
          }
          Row.fromSeq(tmp)
        }
      }
      spark.createDataFrame(filledRdd, neededSchema)
    }

    val orgString = """{"resourceType":"Patient","id":"4ad86a5c-926e-439b-9352-f8ac9ab780f1","lastUpdated":"2022-03-11T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd21481","gender":"male","birthDate":"1974-01-05","maritalStatus":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v3-MaritalStatus","code":"M","display":"M"}],"text":"M"}}"""

    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val orgStringDf = spark.read.json(Seq(orgString).toDS())

    val hudiOptions = Map[String,String](
      HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true"
    )

    orgStringDf.write
      .format("org.apache.hudi")
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
      .options(hudiOptions)
      .mode(SaveMode.Overwrite)
      .save("/tmp/default/clustering/updateTst/json_schema_tst/hudi")

    val patienthudi = spark.read.format("hudi").load("/tmp/default/clustering/updateTst/json_schema_tst/hudi")

    val updatedString = """{"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","gender":"female","birthDate":"2005-08-30","multipleBirthBoolean":true}"""
    val updatedStringDf = createNewDF(spark.read.json(Seq(updatedString).toDS), patienthudi.schema)

    updatedStringDf.write
      .format("org.apache.hudi")
      .options(hudiOptions)
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
      .mode(SaveMode.Append)
      .save("/tmp/default/clustering/updateTst/json_schema_tst/hudi")
    val patienthudi1 = spark.read.format("hudi").load("/tmp/default/clustering/updateTst/json_schema_tst/hudi")
    patienthudi1.select("id","gender","maritalStatus").show(false)
santoshsb commented 2 years ago

Hi @xiarixiaoyao , thanks for the code. It worked like a charm for the reduced json as provided above. After successfully testing it with the reduced schema, we used the complete schema (https://www.hl7.org/fhir/patient.html#resource). Even though the source and target schema are matching the following error is thrown while updating a record (both the schemas are provided below for reference),

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.count(RDD.scala:1253) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:646) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:314) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) ... 67 elided Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:160) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:154) ... 32 more Caused by: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:134) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'coding' not found at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)

Current table schema is as follows,

scala> patienthudi.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- address: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- city: string (nullable = true) | | |-- country: string (nullable = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- extension: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- valueDecimal: double (nullable = true) | | | | |-- url: string (nullable = true) | | |-- line: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- postalCode: string (nullable = true) | | |-- state: string (nullable = true) |-- birthDate: string (nullable = true) |-- communication: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- language: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) |-- extension: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- url: string (nullable = true) | | | | |-- valueCoding: struct (nullable = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | | |-- valueString: string (nullable = true) | | |-- url: string (nullable = true) | | |-- valueAddress: struct (nullable = true) | | | |-- city: string (nullable = true) | | | |-- country: string (nullable = true) | | | |-- state: string (nullable = true) | | |-- valueCode: string (nullable = true) | | |-- valueDecimal: double (nullable = true) | | |-- valueString: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- identifier: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- type: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) | | |-- value: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- managingOrganization: struct (nullable = true) | |-- reference: string (nullable = true) | |-- type: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- meta: struct (nullable = true) | |-- extension: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- url: string (nullable = true) | | | |-- valueString: string (nullable = true) | |-- lastUpdated: string (nullable = true) | |-- source: string (nullable = true) | |-- versionId: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- multipleBirthInteger: long (nullable = true) |-- name: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- family: string (nullable = true) | | |-- given: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- prefix: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- use: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true) |-- telecom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- use: string (nullable = true) | | |-- value: string (nullable = true) |-- text: struct (nullable = true) | |-- div: string (nullable = true) | |-- status: string (nullable = true)

Incoming/Update dataframe schema is as follows after using the code provided by you

scala> updatedStringDf.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- address: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- city: string (nullable = true) | | |-- country: string (nullable = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- extension: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- valueDecimal: double (nullable = true) | | | | |-- url: string (nullable = true) | | |-- line: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- postalCode: string (nullable = true) | | |-- state: string (nullable = true) |-- birthDate: string (nullable = true) |-- communication: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- language: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) |-- extension: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- url: string (nullable = true) | | | | |-- valueCoding: struct (nullable = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | | |-- valueString: string (nullable = true) | | |-- url: string (nullable = true) | | |-- valueAddress: struct (nullable = true) | | | |-- city: string (nullable = true) | | | |-- country: string (nullable = true) | | | |-- state: string (nullable = true) | | |-- valueCode: string (nullable = true) | | |-- valueDecimal: double (nullable = true) | | |-- valueString: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- identifier: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- type: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) | | |-- value: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- managingOrganization: struct (nullable = true) | |-- reference: string (nullable = true) | |-- type: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- meta: struct (nullable = true) | |-- extension: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- url: string (nullable = true) | | | |-- valueString: string (nullable = true) | |-- lastUpdated: string (nullable = true) | |-- source: string (nullable = true) | |-- versionId: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- multipleBirthInteger: long (nullable = true) |-- name: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- family: string (nullable = true) | | |-- given: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- prefix: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- use: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true) |-- telecom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- use: string (nullable = true) | | |-- value: string (nullable = true) |-- text: struct (nullable = true) | |-- div: string (nullable = true) | |-- status: string (nullable = true)

We have seen this issue in the troubleshooting guide but thats when there is a schema, here both the schema are identical. Any pointers will be helpfull.

Thanks, Santosh

santoshsb commented 2 years ago

@xiarixiaoyao We did another test, we used this JSON string {"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","meta":{"extension":[{"url":"http://navify.com/fhir/StructureDefinition/createdBy","valueString":"00u4o6bbvAeNZkXKL296"},{"url":"http://navify.com/fhir/StructureDefinition/modifiedBy","valueString":"00u4o6bbvAeNZkXKL296"}],"versionId":"9","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a"},"text":{"status":"generated","div":"<divxmlns=\"http://www.w3.org/1999/xhtml\">Generatedby<ahref=\"https://github.com/synthetichealth/synthea\">Synthea</a>.Versionidentifier:v2.4.0-483-gad18e5f2\n.Personseed:8166940888549305472Populationseed:1648664610882</div>"},"extension":[{"url":"http://hl7.org/fhir/us/core/StructureDefinition/us-core-race","extension":[{"url":"ombCategory","valueCoding":{"system":"urn:oid:2.16.840.1.113883.6.238","code":"2106-3","display":"White"}},{"url":"text","valueString":"White"}]},{"url":"http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity","extension":[{"url":"ombCategory","valueCoding":{"system":"urn:oid:2.16.840.1.113883.6.238","code":"2186-5","display":"NotHispanicorLatino"}},{"url":"text","valueString":"NotHispanicorLatino"}]},{"url":"http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName","valueString":"Deloise241Orn563"},{"url":"http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex","valueCode":"F"},{"url":"http://hl7.org/fhir/StructureDefinition/patient-birthPlace","valueAddress":{"city":"Westwood","state":"Massachusetts","country":"US"}},{"url":"http://synthetichealth.github.io/synthea/disability-adjusted-life-years","valueDecimal":0.03310522209092858},{"url":"http://synthetichealth.github.io/synthea/quality-adjusted-life-years","valueDecimal":15.966894777909072}],"identifier":[{"system":"https://github.com/synthetichealth/synthea","value":"e6a8e22f-7cf2-4f07-8ad3-ec34479124da"},{"type":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v2-0203","code":"MR","display":"MedicalRecordNumber"}],"text":"MedicalRecordNumber"},"system":"http://hospital.smarthealthit.org","value":"e6a8e22f-7cf2-4f07-8ad3-ec34479124da"},{"type":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v2-0203","code":"SS","display":"SocialSecurityNumber"}],"text":"SocialSecurityNumber"},"system":"http://hl7.org/fhir/sid/us-ssn","value":"999-35-9642"}],"name":[{"use":"official","family":"Haley279","given":["Vinita997"]}],"telecom":[{"system":"phone","value":"555-213-3658","use":"home"}],"gender":"female","birthDate":"2005-08-30","address":[{"extension":[{"url":"http://hl7.org/fhir/StructureDefinition/geolocation","extension":[{"url":"latitude","valueDecimal":42.09490663875005},{"url":"longitude","valueDecimal":-70.82289517957093}]}],"line":["691RunolfsdottirParadeApt5"],"city":"Hanson","state":"Massachusetts","country":"US"}],"maritalStatus":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v3-MaritalStatus","code":"S","display":"NeverMarried"}],"text":"NeverMarried"},"multipleBirthBoolean":false,"communication":[{"language":{"coding":[{"system":"urn:ietf:bcp:47","code":"en-US","display":"English"}],"text":"English"}}],"managingOrganization":{"reference":"7b3f7052-123b-46b7-a8b6-a0e87daaea03","type":"Organization"}}

First we inserted it and later the same dataframe was used to update without any modifications. The same error mentioned above was thrown.

Thanks, Santosh

santoshsb commented 2 years ago

We further took out the coding type from our JSON string one after the other the update worked for 2 elements (identifier and maritalstatus), it the coding type in the element "communication" which is breaking. So when we use this JSON string { "resourceType": "Patient", "id": "596c7a94-bada-4303-85d4-7067c586999e", "lastUpdated": "2022-04-20T15:18:18.90836+05:30", "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a", "meta": { "extension": [ { "url": "http://navify.com/fhir/StructureDefinition/createdBy", "valueString": "00u4o6bbvAeNZkXKL296" }, { "url": "http://navify.com/fhir/StructureDefinition/modifiedBy", "valueString": "00u4o6bbvAeNZkXKL296" } ], "versionId": "9", "lastUpdated": "2022-04-20T15:18:18.90836+05:30", "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a" }, "text": { "status": "generated", "div": "<divxmlns=\"http://www.w3.org/1999/xhtml\">Generatedby<ahref=\"https://github.com/synthetichealth/synthea\">Synthea</a>.Versionidentifier:v2.4.0-483-gad18e5f2\n.Personseed:8166940888549305472Populationseed:1648664610882</div>" }, "extension": [ { "url": "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race", "extension": [ { "url": "ombCategory", "valueCoding": { "system": "urn:oid:2.16.840.1.113883.6.238", "code": "2106-3", "display": "White" } }, { "url": "text", "valueString": "White" } ] }, { "url": "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity", "extension": [ { "url": "ombCategory", "valueCoding": { "system": "urn:oid:2.16.840.1.113883.6.238", "code": "2186-5", "display": "NotHispanicorLatino" } }, { "url": "text", "valueString": "NotHispanicorLatino" } ] }, { "url": "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName", "valueString": "Deloise241Orn563" }, { "url": "http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex", "valueCode": "F" }, { "url": "http://hl7.org/fhir/StructureDefinition/patient-birthPlace", "valueAddress": { "city": "Westwood", "state": "Massachusetts", "country": "US" } }, { "url": "http://synthetichealth.github.io/synthea/disability-adjusted-life-years", "valueDecimal": 0.03310522209092858 }, { "url": "http://synthetichealth.github.io/synthea/quality-adjusted-life-years", "valueDecimal": 15.966894777909072 } ], "name": [ { "use": "official", "family": "Haley279", "given": [ "Vinita997" ] } ], "telecom": [ { "system": "phone", "value": "555-213-3658", "use": "home" } ], "gender": "female", "birthDate": "2005-08-30", "address": [ { "extension": [ { "url": "http://hl7.org/fhir/StructureDefinition/geolocation", "extension": [ { "url": "latitude", "valueDecimal": 42.09490663875005 }, { "url": "longitude", "valueDecimal": -70.82289517957093 } ] } ], "line": [ "691RunolfsdottirParadeApt5" ], "city": "Hanson", "state": "Massachusetts", "country": "US" } ], "multipleBirthBoolean": false, "communication": [ { "language": { "coding": [ { "system": "urn:ietf:bcp:47", "code": "en-US", "display": "English" } ], "text": "English" } } ], "managingOrganization": { "reference": "7b3f7052-123b-46b7-a8b6-a0e87daaea03", "type": "Organization" } }

Insert the single record and then update the same record we see the error.

These are the schema for respective element, only communication throws the error when included `communication: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- language: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true)

identifier: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- type: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) | | |-- value: string (nullable = true)

|-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true)`

Thanks, Santosh

xiarixiaoyao commented 2 years ago

@santoshsb it was strange, let me try it

santoshsb commented 2 years ago

@xiarixiaoyao thanks for helping out, let me know if you need any more information.

santoshsb commented 2 years ago

@xiarixiaoyao FYI, we just tested this issue by building the release branch 0.11.0, here is the JSON string and the schema. { "resourceType": "Patient", "id": "596c7a94-bada-4303-85d4-7067c586999e", "lastUpdated": "2022-04-20T15:18:18.90836+05:30", "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a", "name": [ { "use": "official", "family": "Haley279", "given": [ "Vinita997" ] } ], "gender": "female", "birthDate": "2005-08-30", "multipleBirthBoolean": false, "communication": [ { "language": { "coding": [ { "system": "urn:ietf:bcp:47", "code": "en-US", "display": "English" } ], "text": "English" } } ] }

SCHEMA

scala> hudiPatientDF.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- birthDate: string (nullable = true) |-- communication: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- language: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- name: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- family: string (nullable = true) | | |-- given: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- use: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

We inserted the above JSON string and Updated the same, and it threw the following error which is same as above.

Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'coding' not found at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)

Thanks, Santosh

santoshsb commented 2 years ago

@xiarixiaoyao @yihua we currently don't see this issue when we use the following configuration option --conf 'spark.hadoop.parquet.avro.write-old-list-structure=false'. Should be good to close this, though we are facing another issue with nested columns, will raise a different ticket for that ?

Thanks for all the support, Santosh

santoshsb commented 2 years ago

@xiarixiaoyao FYI, the createNewDF code throws the following error Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of array<string> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ValidateExternalType_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_0_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_3$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)

With the following data, Inserted (Schema simplified to highlight the issue), { "resourceType": "Patient", "id": "beca9a29-49bb-40e4-adff-4dbb4d664972", "lastUpdated": "2022-02-14T15:18:18.90836+05:30", "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a", "name": [ { "use": "official", "family": "Keeling57", "given": [ "Serina556" ], "prefix": [ "Ms." ] } ] }

Update { "resourceType": "Patient", "id": "beca9a29-49bb-40e4-adff-4dbb4d664972", "lastUpdated": "2022-02-14T15:18:18.90836+05:30", "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a", "name": [ { "use": "official", "family": "Keeling57", "given": [ "Serina556" ] } ] }

While updating with the second JSON the prefix is missing and based on the createNewDF it should add that column (verified) with null, null is treated as string and the type is array of string ?

Thanks, Santosh

xiarixiaoyao commented 2 years ago

@santoshsb createNewDF cannot support rewrite DataFrame with nested schema change.

santoshsb commented 2 years ago

thanks @xiarixiaoyao, our schema for storing data as defined by FHIR standards https://www.hl7.org/fhir/patient.schema.json.html seams to be complicated, as most of the fields here are optional the incoming data will always be missing few elements (nested as well as those on the root level). The missing root element is fixed by the code you provided, we are thinking on how to work with the nested fields missing issues.

kazdy commented 2 years ago

@santoshsb will something like this help with your use case:

5873

https://issues.apache.org/jira/browse/HUDI-4276 ?

santoshsb commented 2 years ago

@kazdy thanks for the followup, we had solved this issue at the root level of the schema by the code provided by @xiarixiaoyao. If you check the code (on the top of the post) it merges table columns into the incoming dataframe. As our schema was nested, we started facing similar issues with the nested columns which we have not resolved yet. And because we started adding null for the missing columns we faced this another issue, Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of array<string> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ValidateExternalType_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_0_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_3$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)

If we can handle schema changes at nested level than it will be helpful for us.

xiarixiaoyao commented 2 years ago

@santoshsb could you pls share me the nested columns case code. thanks i think may be we can solve this problem together with https://issues.apache.org/jira/browse/HUDI-4276

santoshsb commented 2 years ago

@xiarixiaoyao as mentioned earlier we didn't solve the nested column case, we are currently trying to finalize a fixed schema and while reading in the data with spark use this schema to avoid any schema evolution. Let me know if you need the sample examples which lead to the issue, here are the two messages.

{ "resourceType": "Patient", "id": "beca9a29-49bb-40e4-adff-4dbb4d664972", "lastUpdated": "2022-02-14T15:18:18.90836+05:30", "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a", "name": [ { "use": "official", "family": "Keeling57", "given": [ "Serina556" ], "prefix": [ "Ms." ] } ] }

This one missing the prefix, { "resourceType": "Patient", "id": "beca9a29-49bb-40e4-adff-4dbb4d664972", "lastUpdated": "2022-02-14T15:18:18.90836+05:30", "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a", "name": [ { "use": "official", "family": "Keeling57", "given": [ "Serina556" ] } ] }

kazdy commented 2 years ago

hi @santoshsb, take a look at this nice PR by @xiarixiaoyao: https://github.com/apache/hudi/pull/6017

nsivabalan commented 2 years ago

@santoshsb : is there anything more to be addressed, or can we close out the issue. I see the PR is merged.

santoshsb commented 2 years ago

Thanks for fixing this, we can close the issue.

santoshsb commented 2 years ago

@nsivabalan @xiarixiaoyao I tested this fix locally, checked out the latest master branch and built the code using the command mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12 used the generated jar and launched the spark-shell using the command ./spark-3.2.1-bin-hadoop3.2/bin/spark-shell \ --jars ls packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.13.0-SNAPSHOT.jar \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.hadoop.parquet.avro.write-old-list-structure=false' We still face the above mentioned issues, am I missing something here ?

codope commented 2 years ago

Please try without this config spark.hadoop.parquet.avro.write-old-list-structure. cc @xiarixiaoyao

santoshsb commented 2 years ago

@codope here is the output without the above mentioned config, have also added the code which am using for testing the fix. --------------ERROR-------------------- 22/09/07 18:53:08 ERROR BoundedInMemoryExecutor: error producing records) / 200] org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'prefix' not found at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:480) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ... 8 more 22/09/07 18:53:09 ERROR BoundedInMemoryExecutor: error consuming records 1) / 1]

--------------CODE--------------------- ~/work/spark-3.2.1-bin-hadoop3.2/bin/spark-shell --jarsls packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.13.0-SNAPSHOT.jar` --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few val orgString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""

val orgStringDf = spark.read.json(Seq(orgString).toDS)

//Specify common DataSourceWriteOptions in the single hudiOptions variable

val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "patient_hudi", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated", DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")

//Write the orgStringDf to a Hudi table orgStringDf.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("/work/data/updateTst/hudi/json_schema_tst")

//Read the Hudi table val patienthudi = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")

//Printschema patienthudi.printSchema

//Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data)

val updatedString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""

//Convert the new resource string into DF val updatedStringDf = spark.read.json(Seq(updatedString).toDS)

//Check the schema of the new resource that is being added updatedStringDf.printSchema

//Upsert the new resource updatedStringDf.write .format("org.apache.hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("/work/data/updateTst/hudi/json_schema_tst")

//Read the Hudi table val patienthudiUpdated = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")

//Print the schema after adding the new record patienthudiUpdated.printSchema`

xiarixiaoyao commented 2 years ago

@santoshsb you need use schema evolution and hoodie.datasource.write.reconcile.schema, see the follow codes

    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.DataSourceReadOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor

    //Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few
    val orgString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val orgStringDf = spark.read.json(Seq(orgString).toDS)

    //Specify common DataSourceWriteOptions in the single hudiOptions variable

    val hudiOptions = Map[String,String](
      HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")

    //Write the orgStringDf to a Hudi table
    orgStringDf.write
      .format("org.apache.hudi")
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
      .options(hudiOptions)
      .option("hoodie.schema.on.read.enable", "true")
      .option("hoodie.datasource.write.reconcile.schema", "true")
      .mode(SaveMode.Overwrite)
      .save("/work/data/updateTst/hudi/json_schema_tst")
    //Read the Hudi table
    val patienthudi = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")

    //Printschema
    patienthudi.printSchema
    //Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data)

    val updatedString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""

    //Convert the new resource string into DF
    val updatedStringDf = spark.read.json(Seq(updatedString).toDS)

    //Check the schema of the new resource that is being added
    updatedStringDf.printSchema

    //Upsert the new resource
    updatedStringDf.write
      .format("org.apache.hudi")
      .options(hudiOptions)
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
//      .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
      .option("hoodie.datasource.write.reconcile.schema", "true")
      .option("hoodie.schema.on.read.enable", "true")
      .mode(SaveMode.Append)
      .save("/work/data/updateTst/hudi/json_schema_tst")

    //Read the Hudi table
    val patienthudiUpdated = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")

    //Print the schema after adding the new record
    patienthudiUpdated.printSchema

    patienthudiUpdated.show(false)

patienthudiUpdated.schema: |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- name: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- family: string (nullable = true) | | |-- given: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- prefix: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- use: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)

i think it should be ok , thanks

santoshsb commented 2 years ago

@xiarixiaoyao thanks for the input, it works with these options .option("hoodie.schema.on.read.enable", "true") .option("hoodie.datasource.write.reconcile.schema", "true") as expected.

codope commented 2 years ago

Great! Gonna close this issue then. FYI, we also plan to flip the default for schema reconciliation in the next release. See #6196