apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.42k stars 2.43k forks source link

[SUPPORT] Merge into does not work for MOR matched clause with extra condition #12180

Open Davis-Zhang-Onehouse opened 2 weeks ago

Davis-Zhang-Onehouse commented 2 weeks ago

Describe the problem you faced Query suite toy example Hudi 0.14 spark 3.4

when we do merge into with extra condition I got error MIT failed

MERGE INTO hudi_table_mor_2_partition_columns t
USING comprehensive_merge_source s
ON t.id = s.id
WHEN MATCHED 
  AND s.operation = 'UPDATE_DEPT_MATCH' 
  AND t.department = s.department 
THEN
  UPDATE SET *;

Full stack trace

Error: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.hudi.exception.HoodieException: Merge into Hoodie table command failed
    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:46)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:261)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:165)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:40)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:165)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:160)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/javax.security.auth.Subject.doAs(Unknown Source)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:174)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Merge into Hoodie table command failed
    at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:441)
    at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:226)
    ... 16 more (state=,code=0)

Full query suite repro the issue

 CREATE TEMPORARY VIEW comprehensive_merge_source AS
  SELECT * FROM (
    -- Update all columns based on department match
    SELECT 
      1 as id,
      'John Doe' as name,
      34.99 as price,
      1599490800 as ts,
      'john.smith@example.com' as email,
      'Engineering' as department,
      80000 as salary,
      to_date('2023-01-15') as hire_date,
      true as is_manager,
      'UPDATE_DEPT_MATCH' as operation
    UNION ALL
    -- Update based on salary condition
    SELECT
      3 as id,
      'Bob Smith' as name,
      44.99 as price,
      1599577200 as ts,
      'bob.smith@example.com' as email,
      'Engineering' as department,
      90000 as salary,
      to_date('2023-02-01') as hire_date,
      true as is_manager,
      'UPDATE_SALARY' as operation
    UNION ALL
    -- Delete record
    SELECT
      4 as id,
      'Alice Johnson' as name,
      0.0 as price,
      1599490800 as ts,
      'alice@example.com' as email,
      'Terminated' as department,
      0 as salary,
      CAST(NULL AS DATE) as hire_date,
      false as is_manager,
      'DELETE' as operation
    UNION ALL
    -- Insert new record (Engineering)
    SELECT
      10 as id,
      'Peter Parker' as name,
      29.99 as price,
      1599577200 as ts,
      'peter@example.com' as email,
      'Engineering' as department,
      60000 as salary,
      to_date('2022-09-01') as hire_date,
      false as is_manager,
      'INSERT' as operation
    UNION ALL
    -- Insert new record (Marketing)
    SELECT
      7 as id,
      'Emma Watson' as name,
      44.99 as price,
      1599577200 as ts,
      'emma.new@example.com' as email,
      'Marketing' as department,
      85000 as salary,
      to_date('2022-08-15') as hire_date,
      true as is_manager,
      'INSERT' as operation
  )
;
 CREATE TABLE hudi_table_mor_2_partition_columns (
  id INT,
  name STRING,
  price DOUBLE,
  ts BIGINT
) USING hudi
LOCATION 's3a://bucket-fb543790/lakes/observed-default/sql/hudi_table_mor_2_partition_columns/'
PARTITIONED BY (id,name)
TBLPROPERTIES (
  type = 'mor',
  primaryKey = 'id',

  preCombineField = 'ts'
)
;
 INSERT INTO hudi_table_mor_2_partition_columns
PARTITION (id,name)
SELECT id, name, price, ts
FROM (
  SELECT 1 as id, 'John Doe' as name, 19.99 as price, 1598886000 as ts
  UNION ALL
  SELECT 2, 'Jane Doe', 24.99, 1598972400
  UNION ALL
  SELECT 3, 'Bob Smith', 14.99, 1599058800
)
;

INSERT INTO
hudi_table_mor_2_partition_columns
PARTITION (id,name)
SELECT id, name, price, ts
FROM (
  SELECT 4 as id, 'Alice Johnson' as name, 34.99 as price, 1599145200 as ts
)
;
 SELECT id, name, price, ts 
FROM hudi_table_mor_2_partition_columns 
ORDER BY id
;
 ALTER TABLE
hudi_table_mor_2_partition_columns
ADD COLUMN email STRING
;

 INSERT INTO hudi_table_mor_2_partition_columns
PARTITION (id,name)
SELECT id, name, price, ts, email
FROM (
  SELECT 6 as id,
         'Tom Hanks' as name,
          44.99 as price,
          1599318000 as ts,
          'tom@example.com' as email
)
;

 ALTER TABLE hudi_table_mor_2_partition_columns
ADD COLUMNS (
  department STRING,
  salary INT,
  hire_date DATE,
  is_manager BOOLEAN
)
;
 UPDATE hudi_table_mor_2_partition_columns
SET department = 'HR', salary = 60000, hire_date = to_date('2022-01-15'), is_manager = true
WHERE id = 1
;
 INSERT INTO hudi_table_mor_2_partition_columns
PARTITION (id,name)
SELECT id, name, price, ts, email, department, salary, hire_date, is_manager
FROM (
  SELECT 
    6 as id, 
    'Tom Hanks' as name, 
    44.99 as price, 
    1599318000 as ts,
    'tom@example.com' as email, 
    'Sales' as department, 
    75000 as salary,
    to_date('2022-03-01') as hire_date, 
    true as is_manager
)
;
 INSERT INTO hudi_table_mor_2_partition_columns
(id, name, price, ts, email, department, hire_date)
VALUES (
  7 as id,
  'Emma Watson' as name,
  39.99 as price,
  1599404400 as ts,
  'emma@example.com' as email,
  'Marketing' as department,
  CAST('2022-03-02' AS DATE) as hire_date)
;

MERGE INTO hudi_table_mor_2_partition_columns t
USING comprehensive_merge_source s
ON t.id = s.id
WHEN MATCHED 
  AND s.operation = 'UPDATE_DEPT_MATCH' 
  AND t.department = s.department 
THEN
  UPDATE SET *
WHEN MATCHED 
  AND s.operation = 'UPDATE_SALARY'
  AND s.salary > t.salary 
THEN
  UPDATE SET *
WHEN MATCHED 
  AND s.operation = 'DELETE' 
THEN
  DELETE
WHEN NOT MATCHED 
  AND s.operation = 'INSERT'
  AND (
    s.department = 'Engineering' 
    OR s.salary >= 70000
  )
THEN
  INSERT *
;

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

danny0405 commented 2 weeks ago

@beyond1920 Maybe you can share some insights here.

ad1happy2go commented 1 week ago

@Davis-Zhang-Onehouse Looks like currently it is not allowing to use reference for main table under extra MATCHED conditions. I confirmed, the query like below works fine -

MERGE INTO hudi_table_mor_2_partition_columns t
USING comprehensive_merge_source s
ON t.id = s.id
WHEN MATCHED 
  AND s.operation = 'UPDATE_DEPT_MATCH' 
  AND s.department = ''
THEN
  UPDATE SET *
WHEN MATCHED 
  AND s.operation = 'UPDATE_SALARY'
  AND s.salary > 0
THEN
  UPDATE SET *
WHEN MATCHED 
  AND s.operation = 'DELETE' 
THEN
  DELETE
WHEN NOT MATCHED 
  AND s.operation = 'INSERT'
  AND (
    s.department = 'Engineering' 
    OR s.salary >= 70000
  )
THEN
  INSERT *
;

Created JIRA to track the fix - https://issues.apache.org/jira/browse/HUDI-8457