dbt-labs / dbt-spark

dbt-spark contains all of the code enabling dbt to work with Apache Spark and Databricks
https://getdbt.com
Apache License 2.0
400 stars 227 forks source link

[ADAP-589] [Bug] Incremental load using Hudi fails #791

Closed skamalj closed 10 months ago

skamalj commented 1 year ago

Is this a new bug in dbt-spark?

Current Behavior

When using 'merge' strategy with 'hudi' , initial load , or first run, works ok. The subsequent runs for incremental load fails with below error.

org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: cannot resolve _hoodie_commit_time in INSERT clause given columns DBT_INTERNAL_SOURCE.id, DBT_INTERNAL_SOURCE.firstname, DBT_INTERNAL_SOURCE.lastname, DBT_INTERNAL_SOURCE.phone, DBT_INTERNAL_SOURCE.email, DBT_INTERNAL_SOURCE.pincode, DBT_INTERNAL_SOURCE.joiningdate, DBT_INTERNAL_SOURCE.eventtime, DBT_INTERNAL_SOURCE.dept; line 15 pos 2 07:51:10 at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230) 07:51:10 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225) 07:51:10 at java.security.AccessController.doPrivileged(Native Method) 07:51:10 at javax.security.auth.Subject.doAs(Subject.java:422) 07:51:10 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) 07:51:10 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239) 07:51:10 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 07:51:10 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 07:51:10 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 07:51:10 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 07:51:10 at java.lang.Thread.run(Thread.java:750) 07:51:10 Caused by: org.apache.spark.sql.AnalysisException: cannot resolve _hoodie_commit_time in INSERT clause given columns DBT_INTERNAL_SOURCE.id, DBT_INTERNAL_SOURCE.firstname, DBT_INTERNAL_SOURCE.lastname, DBT_INTERNAL_SOURCE.phone, DBT_INTERNAL_SOURCE.email, DBT_INTERNAL_SOURCE.pincode, DBT_INTERNAL_SOURCE.joiningdate, DBT_INTERNAL_SOURCE.eventtime, DBT_INTERNAL_SOURCE.dept; line 15 pos 2

My source format is 'json' hence will not have additional meta columns.

Expected Behavior

Subsequent loads with source other than 'hudi; and destination 'hudi' should execute without error.

Steps To Reproduce

  1. Create initial table with format other than Hudi. I am testing on EMR.
  2. Run model to create hudi model
  3. Load additional data in source
  4. Run hudi model again.

Failure is at step 4.

Relevant log output

org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: cannot resolve _hoodie_commit_time in INSERT clause given columns DBT_INTERNAL_SOURCE.id, DBT_INTERNAL_SOURCE.firstname, DBT_INTERNAL_SOURCE.lastname, DBT_INTERNAL_SOURCE.phone, DBT_INTERNAL_SOURCE.email, DBT_INTERNAL_SOURCE.pincode, DBT_INTERNAL_SOURCE.joiningdate, DBT_INTERNAL_SOURCE.eventtime, DBT_INTERNAL_SOURCE.dept; line 15 pos 2
07:51:10        at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230)
07:51:10        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225)
07:51:10        at java.security.AccessController.doPrivileged(Native Method)
07:51:10        at javax.security.auth.Subject.doAs(Subject.java:422)
07:51:10        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
07:51:10        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239)
07:51:10        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
07:51:10        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
07:51:10        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
07:51:10        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
07:51:10        at java.lang.Thread.run(Thread.java:750)
07:51:10      Caused by: org.apache.spark.sql.AnalysisException: cannot resolve _hoodie_commit_time in INSERT clause given columns DBT_INTERNAL_SOURCE.id, DBT_INTERNAL_SOURCE.firstname, DBT_INTERNAL_SOURCE.lastname, DBT_INTERNAL_SOURCE.phone, DBT_INTERNAL_SOURCE.email, DBT_INTERNAL_SOURCE.pincode, DBT_INTERNAL_SOURCE.joiningdate, DBT_INTERNAL_SOURCE.eventtime, DBT_INTERNAL_SOURCE.dept; line 15 pos 2

Environment

- OS: Ubuntu 20.04.3 LTS
- Python: 3.8.13
- dbt-core: 1.6.0b2 
- dbt-spark: 1.6.0b1

Additional Context

I am running this on AWS with storage as S3 with EMR for compute

Fleid commented 1 year ago

@JCZuurmond would you mind taking a look at this one? I don't have an easy way to try to reproduce :/

github-actions[bot] commented 10 months ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please comment on the issue or else it will be closed in 7 days.

github-actions[bot] commented 10 months ago

Although we are closing this issue as stale, it's not gone forever. Issues can be reopened if there is renewed community interest. Just add a comment to notify the maintainers.