apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] Merge into command fails when trying to use only few columns in source data while using partial data payload #11138

Open pravin1406 opened 5 months ago

pravin1406 commented 5 months ago

HUDI version -> 0.14.1 Spark version -> 3.2.0 hadoop version -> 3.1.1 hive version -> 3.1.1

Hi I wanted to use partial data update payload. I have multiple sources, which all want to write into same hudi table. Each of these table do have 1 precombine and record key in common.

With reconcile schema set to true and payload set to partial data payload. I'm able to achieve this as reconcile schema takes the effort to condition my schema propertly when using datasource .

Bu same is not the case when using merge into with spark-sql. It gives me below error.

2024-05-02 02:38:02,771 ERROR io.HoodieAppendHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=pravin partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20240502023620823, fileId=75001b47-689f-41d2-9216-9fbd79502292-0}', newLocation='HoodieRecordLocation {instantTime=20240502023755261, fileId=75001b47-689f-41d2-9216-9fbd79502292-0}'} java.lang.ArrayIndexOutOfBoundsException: 8 at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.isNullAt(SpecificInternalRow.scala:241) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:128) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:118) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:118) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getInsertValue(ExpressionPayload.scala:247) at org.apache.hudi.common.model.HoodieAvroRecord.shouldIgnore(HoodieAvroRecord.java:173) at org.apache.hudi.io.HoodieAppendHandle.prepareRecord(HoodieAppendHandle.java:254) at org.apache.hudi.io.HoodieAppendHandle.writeToBuffer(HoodieAppendHandle.java:592) at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:448) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:338) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:260) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2024-05-02 02:38:03,375 ERROR hudi.HoodieSparkSqlWriterInternal: UPSERT failed with errors 2024-05-02 02:38:03,375 WARN hudi.HoodieSparkSqlWriterInternal: Closing write client org.apache.hudi.exception.HoodieException: Merge into Hoodie table command failed at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:441) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) ... 47 elided On looking at the code i found 2 problems, when i created this table with CTAS query using all relevant options including table_properties. table_properties were not getting set inside the hive table. I had to set hoodie.datasource.hive_sync.table_properties again with all properties. But then also reconcile schema did not seem to work.

After debugging and code walk through i found reconcile schema to be among overriding properties. I read the comment there, but did not really understood the perspective behind it. This has kind of blocked me from using spark sql to partial update my records.

Can you guys explain why this is there? Also how can we make this work.

CTAS Query spark.sql("CREATE TABLE " + tablename + " USING org.apache.hudi " + " OPTIONS ( " + " primaryKey '" + recordkey + "', " + " path '/tmp/pravin/" + tablename + "', " + " hoodie.table.name '" + tablename + "', " + " hoodie.datasource.write.operation 'upsert', " + " hoodie.datasource.write.precombine.field '" + precombinekey + "', " + " hoodie.datasource.write.recordkey.field '" + recordkey + "', " + " hoodie.datasource.write.payload.class 'org.apache.hudi.common.model.PartialUpdateAvroPayload', " + " hoodie.datasource.write.table.type 'MERGE_ON_READ', " + " hoodie.enable.data.skipping 'true', " + " hoodie.datasource.write.reconcile.schema 'true', " + " hoodie.datasource.hive_sync.support_timestamp 'true', " + " hoodie.upsert.shuffle.parallelism '200', " + " hoodie.index.type 'SIMPLE', " + " hoodie.simple.index.update.partition.path 'true', " + " hoodie.datasource.write.hive_style_partitioning 'true', " + " hoodie.datasource.hive_sync.enable 'true', " + " hoodie.datasource.hive_sync.mode 'HMS', " + " hoodie.datasource.hive_sync.sync_comment 'true', " + " hoodie.datasource.hive_sync.database 'default', " + " hoodie.datasource.hive_sync.table_properties '" + tableProperties + "', " + " hoodie.datasource.hive_sync.table '" + tablename + "', " + " hoodie.schema.on.read.enable 'true' " + " ) as select * from merge_source " );

input data val df = Seq((9,"qwertyuiop","US","1","pravin","abcd","SFO")).toDF("EventTime","transactionId","Country","storeNbr","FullName","CompanyName","City")

merge command:

spark.sql("merge into partial_update_4_rt as target using merge_source as source on target."+recordkey +" = source."+recordkey+" when matched then update set City = source.City , CompanyName = source.CompanyName, EventTime = source.EventTime").show

@codope @ad1happy2go

xushiyan commented 5 months ago

@jonvex you have better context on this?

pravin1406 commented 5 months ago

@jonvex Any help on this ?

pravin1406 commented 5 months ago

@nsivabalan @jonvex Can you help on this ?

ad1happy2go commented 4 months ago

@pravin1406 We already had the JIRA Open for this . This is thhe issue with MERGE INTO for MOR tables - https://issues.apache.org/jira/browse/HUDI-6737

@jonvex Did we got a chance to fix it?

Code to reproduce -

CREATE TABLE merge_source (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING PARQUET;

INSERT INTO merge_source
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

CREATE TABLE hudi_table (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING HUDI
PARTITIONED BY (city)
OPTIONS ( 
  primaryKey 'uuid', 
  hoodie.datasource.write.operation 'upsert', 
  hoodie.datasource.write.precombine.field 'ts', 
  hoodie.datasource.write.recordkey.field 'uuid',
  hoodie.table.name 'issue_11138',
  hoodie.datasource.write.table.type 'MERGE_ON_READ'
);

insert into hudi_table 
select * from merge_source;

merge into hudi_table as target 
using merge_source as source 
on target.uuid = source.uuid
when matched then update set target.city =  source.city , 
target.fare = source.fare, target.ts = source.ts;
pravin1406 commented 4 months ago

After debugging and code walk through i found reconcile schema to be among overriding properties. I read the comment there, but did not really understood the perspective behind it. This has kind of blocked me from using spark sql to partial update my records.

This part of the codebase, leads to the failure. I tested with removing reconcile schema form overriding properties, then it worked for me. But not fully aware of it's consequences

@ad1happy2go

pravin1406 commented 3 months ago

@ad1happy2go restarted work on this again, can you help ?

RYiHui commented 2 months ago

Any update?