apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT] insert into hudi table with columns specified(reordered and not in table schema order) throws exception #11552

Closed leesf closed 2 months ago

leesf commented 3 months ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. spark code

        SparkSession spark = SparkSession
                .builder()
                .appName("Spark Data Lake Test")
                .master("local[*]")
                .config("spark.sql.catalogImplementation", "in-memory")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
                .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
                .getOrCreate();
    
        spark.sql("create table if not exists h3(\n" +
                "id bigint,\n" +
                "name string,\n" +
                "price double \n" +
                ") using hudi\n" +
                "options (\n" +
                "primaryKey = 'id',\n" +
                "type = 'mor',\n" +
                "preCombineField = 'name'\n" +
                ") location 'file:///tmp/hudi_test/';");
    
        spark.sql("insert into h3(id, price, name) values(1, 12.2, 'aaa');");
    
        spark.sql("select * from h3;").show(false);
    
        spark.stop();

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'default.h3':
- Cannot safely cast 'price': string to double
    at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotWriteIncompatibleDataToTableError(QueryCompilationErrors.scala:1535)
    at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.resolveOutputColumns(TableOutputResolver.scala:59)
    at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns(HoodieSpark3CatalystPlanUtils.scala:50)
    at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns$(HoodieSpark3CatalystPlanUtils.scala:45)
    at org.apache.spark.sql.HoodieSpark33CatalystPlanUtils$.resolveOutputColumns(HoodieSpark33CatalystPlanUtils.scala:34)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.coerceQueryOutputColumns(InsertIntoHoodieTableCommand.scala:167)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignQueryOutput(InsertIntoHoodieTableCommand.scala:144)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:98)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:780)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:780)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:623)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:780)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:618)
danny0405 commented 3 months ago

@jonvex do you have any insights here?

KnightChess commented 3 months ago

look like hoodie analysis not implement the specified column. @leesf are you working on the fix? I plan to take this up if you have not already.

KnightChess commented 3 months ago

create a ticket to tracking it https://issues.apache.org/jira/browse/HUDI-7949

danny0405 commented 3 months ago

@KnightChess Thanks so much for taking care of the fix

danny0405 commented 2 months ago

Close because it is fixed in: https://github.com/apache/hudi/pull/11568