apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

[SUPPORT]When saveAsTable: java.lang.IllegalArgumentException: Partition-path field has to be non-empty! #12009

Closed bithw1 closed 1 week ago

bithw1 commented 1 month ago

Hi, I am using 0.15.0, I am using following code snippet on the spark-shell,trying to save the spark dataframe as an hudi table.

When I run the code, an exception occurs, the full exception stack trace is pasted below, the main problem is :Caused by: java.lang.IllegalArgumentException: Partition-path field has to be non-empty!,but i have set the partitionField, no sure where the problem is. `

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.{ComplexKeyGenerator,SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode.{Append, Overwrite}
import org.apache.spark.sql.hudi.command.UuidKeyGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import spark.implicits._

  def save2HudiSyncHiveWithPrimaryKey(df: DataFrame, databaseName: String, tableName: String, primaryKey: String, preCombineField: String,
                                      partitionField: String, operation: String, mode: SaveMode): Unit = {
    println("partitionField:" + partitionField)
    println("PARTITIONPATH_FIELD.key:" + PARTITIONPATH_FIELD.key)   
    df.
      write.format("hudi").
      option(RECORDKEY_FIELD.key, primaryKey). 
      option(PRECOMBINE_FIELD.key, preCombineField).
      option(PARTITIONPATH_FIELD.key, partitionField).
      option(TBL_NAME.key, tableName).
      option(KEYGENERATOR_CLASS_NAME.key(), classOf[SimpleKeyGenerator].getName). 
      option(OPERATION.key(), operation).
      option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
      mode(mode)
      .saveAsTable(databaseName + "." + tableName )
  }

    val df = Seq((1, "a1", 10, 1000, "2022-05-12")).toDF("id", "name", "value", "ts", "dt")
    val databaseName = "default"
    val tableName1 = "test_hudi_table_7"
    val primaryKey = "id"
    val preCombineField = "ts"
    val partitionField = "dt"
    save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
    UPSERT_OPERATION_OPT_VAL, Overwrite)

The full exception is:

scala>     save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
     |     UPSERT_OPERATION_OPT_VAL, Overwrite)
partitionField:dt
PARTITIONPATH_FIELD.key:hoodie.datasource.write.partitionpath.field
24/09/26 10:56:13 WARN TableSchemaResolver: Could not find any data file written for commit, so could not get schema for table hdfs://hadoop.master:9000/user/hive/warehouse/test_hudi_table_7
org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.keygen.SimpleKeyGenerator
  at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75)
  at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123)
  at org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:94)
  at org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:83)
  at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:264)
  at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
  at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:100)
  at org.apache.spark.sql.hudi.command.CreateHoodieTableAsSelectCommand.run(CreateHoodieTableAsSelectCommand.scala:106)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:701)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:679)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
  at save2HudiSyncHiveWithPrimaryKey(<console>:209)
  ... 96 elided
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumentException: Partition-path field has to be non-empty!
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73)
  ... 133 more
Caused by: java.lang.IllegalArgumentException: Partition-path field has to be non-empty!
  at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
  at org.apache.hudi.keygen.SimpleKeyGenerator.validatePartitionPath(SimpleKeyGenerator.java:114)
  at org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:54)
  at org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:42)
  ... 138 more
ad1happy2go commented 1 month ago

@bithw1 Why are you seeting this -
option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey"). Remove this and try

ad1happy2go commented 1 month ago

For primary key you are already setting - option(RECORDKEY_FIELD.key, primaryKey).

bithw1 commented 1 month ago

@bithw1 Why are you seeting this - option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey"). Remove this and try

Thanks @ad1happy2go , let me try,thanks.

ad1happy2go commented 1 week ago

@bithw1 Was the suggestion helpful and worked. Feel free to close this issue if you are good.