apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.46k stars 2.43k forks source link

Bugs about the hudi table created by hive catalog and wrong results when querying RO table #10735

Open AshinGau opened 9 months ago

AshinGau commented 9 months ago

Describe the problem you faced

  1. When I create a hudi table in hive catalog, it works well in flink sql, but can't be read by spark or flink hudi catalog. It seems that the hudi table create by hive catalog has wrong schema and inputformat in hive metastore showing by SHOW CREATE TABLE.
  2. After I insert/update/delete a MOR table, the result of querying the _ro table is the same as _rt table, but spark return the different results when querying _ro table.

To Reproduce Flink 1.17.2 + Hudi 0.14.1

Steps to reproduce the behavior:

  1. Launch flink sql
    export FLINK_VERSION=1.17 
    export HUDI_VERSION=0.14.1
    ./bin/sql-client.sh embedded -j lib/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar shell
  2. Create the hive catalog and hudi catalog
    
    -- hive catlaog
    create catalog hive with (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' ='/usr/local/service/hive/conf');

-- hudi catlaog create catalog hudi with ( 'type'='hudi', 'catalog.path' = 'hdfs://xxx/hudi_flink_hive_catalog', 'hive.conf.dir' = '/usr/local/service/hive/conf', 'mode'='hms');

3. Create a hudi table in hive catalog
Use the following sql to create a hudi table in hive catalog. flink 1.17 can insert the partitioned table, but throws errors when querying. It works will in flink 1.14. It maybe a bug in flink 1.17 + hudi 0.14.

use catalog hive; use hudi_flink; CREATE TABLE hive_ctl_table( ts BIGINT, uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED, rider VARCHAR(20), driver VARCHAR(20), fare DOUBLE, city VARCHAR(20) ) -- PARTITIONED BY (city) // flink 1.17 can insert the partitioned table, but throws errors when querying. It works will in flink 1.14 WITH ( 'connector' = 'hudi', 'path' = 'hdfs://xxx/hudi_flink.db/hive_ctl_table', 'table.type' = 'MERGE_ON_READ' );

The hudi table created by hive catalog can be insert/update/delete/select by flink sql, but throws errors when querying by spark or flink hudi catalog. It seems that the table has wrong schema and inputformat in hive metastore showing by `SHOW CREATE TABLE`. The table has no fields and stored as `TextInputFormat`.
![image](https://github.com/apache/hudi/assets/19337507/ee422bbc-0eb9-4968-9cea-6c9d82bf12fd)
4. Create a hudi table in hudi catalog

use catalog hudi; use hudi_flink; CREATE TABLE hudi_ctl_table( ts BIGINT, uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED, rider VARCHAR(20), driver VARCHAR(20), fare DOUBLE, city VARCHAR(20) ) PARTITIONED BY (city) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://xxx/hudi_ctl_table', 'table.type' = 'MERGE_ON_READ' );


After creating the table, use the insert/update/delete commands introduced by https://hudi.apache.org/docs/flink-quick-start-guide#insert-data to produce data. the result of querying the `_ro` table is the same as `_rt` table, but spark return the different results when querying `_ro` table.
![image](https://github.com/apache/hudi/assets/19337507/ae376dee-bb34-4ac0-ba75-d43e9ef9fc2c)
When I list the files in hudi path, there are only logs files and no base files, the result should by empty when querying `_ro` table, but flink returns the result which merges the insert/update/delete operations, just the same as `_rt` table.
![image](https://github.com/apache/hudi/assets/19337507/17cf2c44-3109-4928-bff9-fba9eedf846d)

**Expected behavior**

1. The hudi table create by hive catalog just has wrong schema and inputformat, it still can be parsed correctly by reading `.hoodie`. I am the doris committer, after I find the bug, I have submit a PR(https://github.com/apache/doris/pull/31181) to read `.hoodie` to get the right schema. It works well when querying the hudi table created by hive catalog.
2. I am not sure if the result of querying the `_ro` table is correct by flink sql, but the result is inconsistent with spark, and the query result of flink is likely to be incorrect.

**Environment Description**

* Hudi version : 0.14.1

* Flink version: 1.17.2

* Spark version : 3.2.1

* Hive version : 3.1.1

* Hadoop version : 3.2.2

* Storage (HDFS/S3/GCS..) : HDFS 3.2.2

* Running on Docker? (yes/no) : no
danny0405 commented 9 months ago

We should not use Hive catalog, that's why we introduce a HoodieHiveCatalog where we do many tasks for createTable.

AshinGau commented 9 months ago

We should not use Hive catalog, that's why we introduce a HoodieHiveCatalog where we do many tasks for createTable.

OK, I would strongly recommend that users use the hudi catalog and no longer maintain abnormal behavior in the hive catalog. How about the unexpected result of querying _ro table created by hudi catalog? Is it a bug in flink-sql?

danny0405 commented 9 months ago

Probably, can you show the table paramerers read from Hudi hive catalog for the problematic ro table?

ad1happy2go commented 7 months ago

@AshinGau An updates here?