Closed CTTY closed 1 month ago
@xicm I agree and I think we should remove precombine field from datasource conf if we don't want to allow users to change precombine fields of their tables
@CTTY what is the case that we don't want to allow users to change the precombine fields? I use the config to set the field I'd like to use like "_etl_cluster_ts".
If it is removed, how can I set my custom precombine fields?
So, this issue is about "INSERT INTO should work on tables created with dataframe api"?
Table created by dataframe api will init hoodie.table.precombine.field
. When we use insert into
in spark sql, we don't set hoodie.table.precombine.field
or hoodie.datasource.write.precombine.field
, but buildHoodieInsertConfig
set precombine to "" if precombine filed is not set.
When we do validation precombine field in table properties is its real value, the value in params is "", and throw an exception.
So we either remove the code in ProvidesHoodieConfig or let the defauilt value to be null not empty string.
I posted a quick patch to fix the issue, but ideally I think Hudi should remove all write configs that are not allowed to change. Or we can point those write configs to the equivalent table configs. So we don't even need the validation logic here: https://github.com/apache/hudi/blob/64f546b8f0cae70793a6150170a649bad8e0e146/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala#L165
Is there any workaround to this that doesn't require a patch? This unfortunately makes Hudi unusable with dbt-spark
(for us) because every INSERT INTO
statement breaks on this validation check on the precombine field.
Edit: I think this affects all tables that are created with a precombine field, not just through DataFrame API - eg. https://github.com/apache/hudi/issues/10626 has an example where precombine is set in dbt-spark
@mzheng-plaid have you tried spark.sql("set hoodie.datasource.write.precombine.field=<precombine>")
in your session?
@CTTY thanks for the suggestion, I'll try that tomorrow
Yes that worked, thanks @CTTY
Tips before filing an issue
Have you gone through our FAQs?
Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
INSERT INTO
queries would fail on table created with dataframe API due to config conflict. The exception below shows conflict on precombine field but I believe this can happen on any datasource config.To Reproduce
Steps to reproduce the behavior:
import org.apache.spark.sql.SaveMode
val df1 = Seq( ("100", "2015-01-01", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"), ("101", "2015-01-01", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"), ("102", "2015-01-01", "event_name_345", "2015-01-01T13:51:40.417052Z", "type3"), ("103", "2015-01-01", "event_name_234", "2015-01-01T13:51:40.519832Z", "type4"), ("104", "2015-01-01", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"), ("105", "2015-01-01", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2"), ("106", "2015-01-01", "event_name_890", "2015-01-01T13:51:44.735360Z", "type3"), ("107", "2015-01-01", "event_name_944", "2015-01-01T13:51:45.019544Z", "type4"), ("108", "2015-01-01", "event_name_456", "2015-01-01T13:51:45.208007Z", "type1"), ("109", "2015-01-01", "event_name_567", "2015-01-01T13:51:45.369689Z", "type2"), ("110", "2015-01-01", "event_name_789", "2015-01-01T12:15:05.664947Z", "type3"), ("111", "2015-01-01", "event_name_322", "2015-01-01T13:51:47.388239Z", "type4") ).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
val r = scala.util.Random val num = r.nextInt(99999) var tableName = "tableName" + num var tablePath = "table path"
df1.write.format("hudi") .option("hoodie.metadata.enable", "true") .option("hoodie.table.name", tableName) .option("hoodie.datasource.write.operation", "upsert") .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.datasource.write.recordkey.field", "event_id,event_date") .option("hoodie.datasource.write.partitionpath.field", "event_type") .option("hoodie.datasource.write.precombine.field", "event_ts") .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator") // .option("hoodie.datasource.write.hive_style_partitioning", "true") .option("hoodie.datasource.hive_sync.enable", "true") .option("hoodie.datasource.meta.sync.enable", "true") .option("hoodie.meta.sync.client.tool.class", "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool") .option("hoodie.datasource.hive_sync.mode", "hms") .option("hoodie.datasource.hive_sync.database", "default") .option("hoodie.datasource.hive_sync.table", tableName) .option("hoodie.datasource.hive_sync.partition_fields", "event_type") .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor") .mode(SaveMode.Append) .save(tablePath)
INSERT INTO table_name (event_id, event_date, event_name, event_ts, event_type) VALUES ('131', '2015-01-01', 'event_name_567', '2015-01-01T13:51:45.369689Z', 'type2')
org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value): PreCombineKey: event_ts at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:212) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:249) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:108) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)