apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Cannot write nullable values to non-null column #9042

Closed dht7 closed 12 months ago

dht7 commented 1 year ago

Describe the problem you faced

Apache Hudi tables created using CTAS command in Spark-SQL with array column types fail to insert overwrite data.

To Reproduce

Steps to reproduce the behavior:

  1. Run the following create statement via Spark-SQL (or Spark-Thrift server):

    create table test_database.test_table using hudi options (type="cow", primaryKey='id', hoodie.table.name="test_table") location '<GCS bucket location>' as select id, array_column1 from test_database.source_table;

    Here, source_table is also a Hudi table.

  2. Once the table is created, try to update the data by running the following query:

    insert overwrite table test_database.test_table select id, array_column1 FROM test_database.source_table;

    Running the above query results into org.apache.spark.sql.AnalysisException (complete stack-trace attached below).

Expected behavior

Insert overwrite query should get executed on the target table without any issues/exceptions.

Environment Description

Additional context

Error: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'test_database.test_table':
- Cannot write nullable elements to array of non-nulls: 'array_column1'
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:362)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:264)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:78)
    at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:62)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:264)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:259)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:273)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'test_database.test_table':
- Cannot write nullable elements to array of non-nulls: 'array_column1'
    at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.resolveOutputColumns(TableOutputResolver.scala:73)
    at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns(HoodieSpark3CatalystPlanUtils.scala:45)
    at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns$(HoodieSpark3CatalystPlanUtils.scala:40)
    at org.apache.spark.sql.HoodieSpark31CatalystPlanUtils$.resolveOutputColumns(HoodieSpark31CatalystPlanUtils.scala:25)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.coerceQueryOutputColumns(InsertIntoHoodieTableCommand.scala:164)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignQueryOutput(InsertIntoHoodieTableCommand.scala:145)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:99)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:60)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
    at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:326)
    ... 16 more (state=,code=0)
ad1happy2go commented 1 year ago

@dht7 Able to reproduce this issue.

Root cause was the source table had column nullable false but the hudi table had it true.

Source table

image

Target Table

image

Spark SQL Code -

create database test_database1;
create table test_database1.source_table
as select 1 as id, array(1,2,3) as array_column1;
create table test_database1.test_table using hudi options (type="cow", primaryKey='id', hoodie.table.name="test_table")
select id, array_column1 from test_database1.source_table;
insert overwrite table test_database1.test_table select id, array_column1 FROM test_database1.source_table;

JIRA created to fix this behaviour - https://issues.apache.org/jira/browse/HUDI-6438

dht7 commented 1 year ago

Thank you @ad1happy2go for testing the issue and creating the JIRA ticket.

Just wanted to add some additional details:

amrishlal commented 1 year ago

@ad1happy2go I am not able to reproduce the issue against the latest master version of hudi using either spark-3.1 and spark-3.2 using the steps you outlined. Do we know if this issue is limited only to older version of Hudi (version : 0.12.2 as reported in the description)?

ad1happy2go commented 1 year ago

yes , I also confirmed with master I am not seeing this issue.

@dht7 Can you check with master code if possible if you are still facing this issue?

amrishlal commented 1 year ago

The issue was fixed through PR: #8725. Error message shows up before this PR.

dht7 commented 1 year ago

Thank you @ad1happy2go I was able to test this and can confirm that we are not facing the issue when using the latest master code.

Thank you @amrishlal for pointing this out and sharing the PR details.

cc: @codope

subash-metica commented 3 months ago

Hi @ad1happy2go ,

Using Hudi : 0.14.1

With this change - is it fair to say that we can't mitigate the error Incoming batch schema is not compatible with the table's one ?

How to solve for this scenario of schema mismatch since table schema has non-nullable and new data has null values.

Background: I got the original error of non-nullable column can't accept null values then I removed using MAKE_NEW_COLUMNS_NULLABLE as parameter which was originally configured to support schema evolution, but now I get Incoming batch schema is not compatible with the table's one since the Hudi table has null for a column.