apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.46k stars 2.43k forks source link

[SUPPORT] Flink Hudi Job Configuration and Parameter Conflict Issues #12024

Open usberkeley opened 1 month ago

usberkeley commented 1 month ago

Describe the problem you faced

  1. Configuration Conflict in Flink Hudi Job: When modifying the configuration of an existing Flink Hudi Job, if there is a conflict with the Table Config (hoodie.properties), the job does not throw an error. Instead, it silently overrides the user configuration with the previous Table Config settings.
  2. Parameter Conflict Handling: When certain parameters conflict, the job does not check for these conflicts at startup. Errors are only thrown during runtime, which delays problem detection.
  3. Flink SQL Keywords Conflict: When the Flink SQL keywords PRIMARY KEY and PARTITIONED BY conflict with user configurations hoodie.datasource.write.recordkey.field and hoodie.datasource.write.partitionpath.field, the job does not throw an error. Instead, it prioritizes the Flink SQL keywords over the Hoodie configurations.

To Reproduce

Steps to reproduce the behavior:

  1. Configuration Conflict in Flink Hudi Job

    1. Start a Flink Hudi Job and create hoodie.properties.
    2. Modify the Flink Hudi Table configuration and restart the job.
    3. Observe that the modifications do not take effect and the job starts normally without errors.
  2. Parameter Conflict Handling

    1. Configure the record key field with two fields: hoodie.datasource.write.recordkey.field = 'uuid,name'.
    2. Configure a mismatched key generator: hoodie.datasource.write.keygenerator.class='org.apache.hudi.keygen.SimpleAvroKeyGenerator'.
    3. The job will throw an error during runtime, indicating that the Avro Record cannot find the fields 'uuid,name'.
  3. Flink SQL Keywords Conflict

    1. Create a Flink table:
      CREATE TABLE t_test (
      `uuid` VARCHAR(20),
      `name` VARCHAR(10),
      `age` INT,
      `ts` TIMESTAMP(3),
      `partition` VARCHAR(10),
      PRIMARY KEY (uuid, name) NOT ENFORCED
      )
      PARTITIONED BY (`partition`)
      WITH (
      'hoodie.datasource.write.recordkey.field' = 'age',
      'hoodie.datasource.write.partitionpath.field' = 'name'
      );
    2. Observe that the job does not throw an error and internally prioritizes PRIMARY KEY and PARTITIONED BY over the Hoodie Config settings.

Expected behavior

Discussion item: I'd like to ask whether we should strictly check for configuration conflicts. Should we directly report an error in case of a conflict, rather than internally modifying user parameters?

I prefer directly reporting an error. I have a reason: if we don't report an error, users might mistakenly believe their configuration is valid, which could lead to confusion.

Environment Description

rangareddy commented 1 month ago

Hi @usberkeley

Thank you for reporting this issue. I was able to replicate the issue using Spark as well. Specifically, I created a table using Spark SQL, specifying primary columns as id and name. Then, I inserted data into the table using a DataFrame (df) by specifying primary columns as id and salary instead of id and name.

Spark Code:

package com.ranga

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

object Test12024 extends App {
  val name = this.getClass.getSimpleName.replace("$", "")
  val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

  val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

  spark.sql(
    """
      |CREATE TABLE IF NOT EXISTS t_test (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '/tmp/warehouse/t_test'
    """.stripMargin)

  val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
  ))

  val input_data = Seq(
    Row(1L, "hello", 42,  1695159649087L),
    Row(2L, "world", 13, 1695091554788L),
    Row(3L, "spark", 7, 1695115999911L),
    Row(1L, "hello", 43,  1695159649087L),
  )

  val tableName = name
  val basePath = f"file:///tmp/$tableName"
  val hoodieConf = scala.collection.mutable.Map[String, String]()
  hoodieConf.put("hoodie.datasource.write.recordkey.field", "id,age")
  hoodieConf.put("hoodie.table.precombine.field", "ts")
  hoodieConf.put("hoodie.table.name", tableName)

  val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)
  input_df.write.format("hudi").
    options(hoodieConf).
    mode("overwrite").
    save(basePath)

  spark.read.format("hudi").load(basePath).show(false)

  spark.stop()
}
rangareddy commented 1 month ago

Created upstream jira to track the issue:

https://issues.apache.org/jira/browse/HUDI-8278

rangareddy commented 1 month ago

I later tested the append mode as well and encountered the same issue.

danny0405 commented 1 month ago

PRIMARY KEY and PARTITIONED BY conflict with user configurations hoodie.datasource.write.recordkey.field and hoodie.datasource.write.partitionpath.field, the job does not throw an error. Instead, it prioritizes the Flink SQL keywords over the Hoodie configurations.

This is by-design.

usberkeley commented 1 month ago

PRIMARY KEY and PARTITIONED BY conflict with user configurations hoodie.datasource.write.recordkey.field and hoodie.datasource.write.partitionpath.field, the job does not throw an error. Instead, it prioritizes the Flink SQL keywords over the Hoodie configurations.

This is by-design.

@danny0405 Thank you, I understand that the priority of the PRIMARY KEY is higher than that of the Hoodie Config. However, when there are two conflicting configurations, it can confuse the user.

In this case, should we directly throw an error? Inform the user of the conflict and ask them to correct the configuration.

danny0405 commented 1 month ago

In this case, should we directly throw an error? Inform the user of the conflict and ask them to correct the configuration

Maybe we just log some warnings there.

rangareddy commented 1 month ago

Tested the same code using Hive Sync. No issue is reported while writing and reading.

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

val name = "Hudi_Test"
val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

spark.sql(
    """
      |CREATE TABLE IF NOT EXISTS t_test (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '/tmp/warehouse/t_test'
    """.stripMargin)

val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
))

val input_data = Seq(
    Row(1L, "hello", 42,  1695159649087L),
    Row(2L, "world", 13, 1695091554788L),
    Row(3L, "spark", 7, 1695115999911L),
    Row(1L, "hello", 43,  1695159649087L),
)

val basePath = f"file:///tmp/$tableName"
val tableName = name
val databaseName = "test"

val hoodieConf = Map(
    "hoodie.datasource.write.recordkey.field" -> "id,age",
    "hoodie.datasource.write.recordkey.field" -> "id,age",
    "hoodie.table.precombine.field" -> "ts",
    "hoodie.table.name" -> tableName,
    "hoodie.database.name" -> databaseName,
    "hoodie.datasource.meta.sync.enable" -> "true",
    "hoodie.datasource.hive_sync.enable" -> "true",
    "hoodie.datasource.hive_sync.table" -> tableName,
    "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.use_jdbc" -> "false",
    "hoodie.datasource.hive_sync.mode" -> "hms",
    "hoodie.datasource.write.hive_style_partitioning" -> "true"
)

val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)
input_df.write.format("hudi").options(hoodieConf).mode("overwrite").save(basePath)

spark.read.format("hudi").load(basePath).show(false)
rangareddy commented 1 month ago

Hi @usberkeley

I got the expected exception when we specify the same location while creating the table and saving the data.

Exception:

24/10/17 12:10:12 INFO HoodieTableConfig: Loading table properties from file:/tmp/hudi/Test12114/.hoodie/hoodie.properties
24/10/17 12:10:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///tmp/hudi/Test12114
Exception in thread "main" org.apache.hudi.exception.HoodieException: Config conflict(key   current value   existing value):
RecordKey:  id,age  id,name
    at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:229)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)

Code:

import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}

object Test12024 extends App {
  val name = this.getClass.getSimpleName.replace("$", "")
  val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

  val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

  val tableName = name
  val basePath = f"file:///tmp/hudi/$tableName"

  spark.sql(
    f"""
      |CREATE TABLE IF NOT EXISTS ${tableName} (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '${basePath}'
    """.stripMargin)

  val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
  ))

  val input_data = Seq(
    Row(1L, "hello", 42,  1695159649087L),
    Row(2L, "world", 13, 1695091554788L),
    Row(3L, "spark", 7, 1695115999911L),
    Row(1L, "hello", 43,  1695159649087L),
  )

  val hoodieConf = scala.collection.mutable.Map[String, String]()
  hoodieConf.put("hoodie.datasource.write.recordkey.field", "id,age")
  hoodieConf.put("hoodie.table.precombine.field", "ts")
  hoodieConf.put("hoodie.table.name", tableName)

  val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)

  input_df.write.format("hudi").
    options(hoodieConf).
    mode("append").
    save(basePath)

  spark.read.format("hudi").load(basePath).show(false)

  spark.stop()
}
rangareddy commented 1 month ago

Hi @usberkeley

Please let me know is there any update?

usberkeley commented 1 month ago

Hi @usberkeley

Please let me know is there any update?

@rangareddy Wow, that's great! From the code, it seems Spark has checks in place. Could you please help take a look at Flink as well?

rangareddy commented 1 month ago

Hi @usberkeley

Have you tried to replicate the issue from Flink?

https://github.com/apache/hudi/issues/12024#issuecomment-2418667373