apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.33k stars 2.42k forks source link

[SUPPORT] Failed to insert overwrite hudi table when defining partition column with int type. #11623

Closed leesf closed 1 month ago

leesf commented 2 months ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Code Below.
SparkSession spark = SparkSession.builder()
                .appName("Spark Hudi Example")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
                .master("local[*]")
                .getOrCreate();

        spark.sql("drop table if exists insert_overwrite_partition purge");

        spark.sql("Create Table if not exists insert_overwrite_partition (\n" +
                "    id int,\n" +
                "    name string,\n" +
                "    age int\n" +
                ") using hudi partitioned by (age) TBLPROPERTIES (primaryKey = 'id', preCombineField = 'age')\n" +
                "LOCATION '/tmp/insert_overwrite_partition/'; ");

        spark.sql("insert into insert_overwrite_partition values(1, 'a', 10000), (2, 'spark', 20000)");

        spark.sql("insert overwrite insert_overwrite_partition partition(age=10) values(10, 'adb')");

        spark.sql("select * from insert_overwrite_partition").show();
  1. When running first time, it would success. But if run again it will the exception.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:40)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:40)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:165)
    at org.apache.spark.sql.catalyst.InternalRow$.$anonfun$getAccessor$4(InternalRow.scala:138)
    at org.apache.spark.sql.catalyst.InternalRow$.$anonfun$getAccessor$4$adapted(InternalRow.scala:138)
    at org.apache.spark.sql.catalyst.InternalRow$.$anonfun$getAccessor$16(InternalRow.scala:159)
    at org.apache.spark.sql.catalyst.InternalRow$.$anonfun$getAccessor$16$adapted(InternalRow.scala:155)
    at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:40)
    at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:664)
    at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
    at org.apache.hudi.SparkHoodieTableFileIndex.$anonfun$listMatchingPartitionPaths$8(SparkHoodieTableFileIndex.scala:253)
    at org.apache.hudi.SparkHoodieTableFileIndex.$anonfun$listMatchingPartitionPaths$8$adapted(SparkHoodieTableFileIndex.scala:253)
    at scala.collection.TraversableLike.noneIn$1(TraversableLike.scala:319)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:385)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.hudi.SparkHoodieTableFileIndex.listMatchingPartitionPaths(SparkHoodieTableFileIndex.scala:253)
    at org.apache.hudi.SparkHoodieTableFileIndex.getPartitionPaths(SparkHoodieTableFileIndex.scala:202)
    at org.apache.spark.sql.hudi.ProvidesHoodieConfig.deduceOverwriteConfig(ProvidesHoodieConfig.scala:389)
    at org.apache.spark.sql.hudi.ProvidesHoodieConfig.deduceOverwriteConfig$(ProvidesHoodieConfig.scala:353)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.deduceOverwriteConfig(InsertIntoHoodieTableCommand.scala:66)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:92)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:692)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:683)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:714)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:745)
danny0405 commented 2 months ago

@KnightChess Did you have intreast for this fix?

KnightChess commented 2 months ago

@danny0405 I'll take look these day