apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

Failed insert schema compatibility mismatch issue #11277

Open SamarthRaval opened 4 months ago

SamarthRaval commented 4 months ago

Describe the problem you faced

To Reproduce

Steps to reproduce the behavior:

  1. Did bulk-insert loaded dataset.
  2. Ran insert operation ran into - Failed insert schema compatibility check
  3. when done step-2 with bulk-insert it ran fine and expanded the schema.
  4. Adding configurations I used for it.
        ImmutableMap.Builder<String, String> hudiOptions = ImmutableMap.<String, String>builder()
                .put("hoodie.table.name", tableName)
                .put("hoodie.datasource.write.recordkey.field", "uniqueId")
                .put("hoodie.datasource.write.precombine.field", "version")
                .put("hoodie.datasource.write.table.type", HoodieTableType.COPY_ON_WRITE.name())
                .put("hoodie.datasource.write.operation", operation)
                .put("hoodie.combine.before.insert", "true")
                .put("hoodie.datasource.write.keygenerator.class", SimpleKeyGenerator.class.getName())

                .put("hoodie.bulkinsert.sort.mode", "GLOBAL_SORT")
                .put("hoodie.copyonwrite.record.size.estimate", "50")
                .put("hoodie.parquet.small.file.limit", "104857600")
                .put("hoodie.parquet.max.file.size", "125829120")

                .put("hoodie.write.set.null.for.missing.columns", "true")
                .put("hoodie.datasource.write.reconcile.schema", "true")

                .put("hoodie.datasource.write.partitionpath.field", PARTITION_COLUMN_NAME)
                .put("hoodie.datasource.hive_sync.partition_fields", PARTITION_COLUMN_NAME)

                .put("hoodie.datasource.hive_sync.enable", "true")
                .put("hoodie.datasource.write.hive_style_partitioning", "true")

                .put("hoodie.datasource.hive_sync.table", tableName)
                .put("hoodie.datasource.hive_sync.database", hudiDatabase)
                .put("hoodie.datasource.hive_sync.auto_create_database", "true")
                .put("hoodie.datasource.hive_sync.support_timestamp", "true")
                .put("hoodie.datasource.hive_sync.use_jdbc", "false")
                .put("hoodie.datasource.hive_sync.mode", "hms")
                .put("hoodie.datasource.hive_sync.partition_extractor_class", MultiPartKeysValueExtractor.class.getName())

                .put("hoodie.metadata.enable", "true")
                .put("hoodie.meta.sync.metadata_file_listing", "true")

                .put("hoodie.clean.automatic", "true")
                .put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS")
                .put("hoodie.cleaner.commits.retained", "30")
                .put("hoodie.cleaner.parallelism", "1000")

                .put("hoodie.archive.merge.enable", "true")
                .put("hoodie.commits.archival.batch", "30")

                .put("hoodie.write.concurrency.mode", "OPTIMISTIC_CONCURRENCY_CONTROL")
                .put("hoodie.cleaner.policy.failed.writes", "LAZY")
                .put("hoodie.write.concurrency.early.conflict.detection.enable", "true")

                .put("hoodie.write.lock.provider", "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider")
                .put("hoodie.write.lock.dynamodb.table", hudiLockTable)
                .put("hoodie.write.lock.dynamodb.partition_key", warehouseTableName)
                .put("hoodie.write.lock.dynamodb.region", AWSUtils.getCurrentRegion().getName())
                .put("hoodie.write.lock.dynamodb.endpoint_url", String.format("dynamodb.%s.amazonaws.com", AWSUtils.getCurrentRegion().getName()))
                .put("hoodie.write.lock.dynamodb.billing_mode", "PAY_PER_REQUEST");

        if (operation.equals("insert"))
        {
            hudiOptions.put("hoodie.datasource.write.insert.drop.duplicates", "true");
        }

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Exception in thread "main" org.apache.hudi.exception.HoodieInsertException: Failed insert schema compatibility check
    at org.apache.hudi.table.HoodieTable.validateInsertSchema(HoodieTable.java:868)
    at org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:165)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:218)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:113)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:479)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:101)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:151)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Failed to read schema/check compatibility for base path <S3 path>
    at org.apache.hudi.table.HoodieTable.validateSchema(HoodieTable.java:844)
    at org.apache.hudi.table.HoodieTable.validateInsertSchema(HoodieTable.java:866)
    ... 60 more
Caused by: org.apache.hudi.exception.SchemaCompatibilityException: Column dropping is not allowed

all schema comparisions 

    at org.apache.hudi.avro.AvroSchemaUtils.checkSchemaCompatible(AvroSchemaUtils.java:373)
    at org.apache.hudi.table.HoodieTable.validateSchema(HoodieTable.java:842)
    ... 61 more
SamarthRaval commented 4 months ago

While trying to do insert operation after bulk-insert, ran into above error.

Not sure what to do here ?

SamarthRaval commented 4 months ago

@xushiyan @ad1happy2go @bhasudha

Could you please help me here thank you.

danny0405 commented 4 months ago

Did you try to use the drop column statement?

ad1happy2go commented 4 months ago

@SamarthRaval hoodie.datasource.write.reconcile.schema should ideally handle that,. can you try removing hoodie.write.set.null.for.missing.columns.

ad1happy2go commented 4 months ago

@SamarthRaval Let's try to reproduce with sample dataset if possible.

SamarthRaval commented 4 months ago

@SamarthRaval hoodie.datasource.write.reconcile.schema should ideally handle that,. can you try removing hoodie.write.set.null.for.missing.columns.

yes actually it should handle it, even if I have few columns missing from writeSchema.

Problem is for other customer it does work with same configurations, with no problem at all.

SamarthRaval commented 4 months ago

@SamarthRaval hoodie.datasource.write.reconcile.schema should ideally handle that,. can you try removing hoodie.write.set.null.for.missing.columns.

I tried to follow this

image
SamarthRaval commented 4 months ago

Did you try to use the drop column statement?

No I am not dropping any column but when checked closing there are some columns which are missing, but shouldn't it automatically take care of it as per hoodie.datasource.write.reconcile.schema

SamarthRaval commented 4 months ago

@SamarthRaval Let's try to reproduce with sample dataset if possible.

Hello @ad1happy2go @danny0405

I was able to reproduce this in research, and was able to get exact same error.

In research with in-between column is missing, it throws above error.

My understanding was with reconcile.schema enabled, it will just populated null for missing column, but seems this not the case.

Any idea with this ?