apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.47k stars 2.43k forks source link

[SUPPORT] How to query different queries via Hive 3 & Trino #8038

Closed stayrascal closed 1 year ago

stayrascal commented 1 year ago

Describe the problem you faced

Hi, I using Flink 1.16 create some tables(COW & MOR) base HoodieCatalog, and write data to these tables, and try to use other engines to query data, but I'm confused how to use Hive to query incremental queries, I didn't found any official documents about these operation except the demo docs from https://github.com/apache/hudi/blob/master/docker/demo/hive-incremental-mor-rt.commands.

And I meet two problems:

To Reproduce

Steps to reproduce the behavior:

  1. Using Flink to create Hoodie Catalog & Tables & Insert Data.
    
    CREATE CATALOG hms_catalog WITH (
    'type'='hudi',
    'catalog.path'='hdfs://xxxxx-1:8020/xxxx/hive',
    'hive.conf.dir'='/xxxxx/hive/conf/',
    'mode'='hms'
    );
    CREATE DATABASE hudi_hms_db;

CREATE TABLE hms_catalog.hudi_hms_db.flink_hudi_mor_tbl( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), partition VARCHAR(20) ) PARTITIONED BY (partition) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts', 'hive_sync.enabled' = 'true' );

CREATE TABLE hms_catalog.hudi_hms_db.flink_hudi_mor_streaming_tbl( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), partition VARCHAR(20) ) PARTITIONED BY (partition) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts', 'hive_sync.enabled' = 'true' );

CREATE TABLE flink_hudi_cow_tbl( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), partition VARCHAR(20) ) PARTITIONED BY (partition) WITH ( 'connector' = 'hudi', 'table.type' = 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts' );

-- write twice with different uuid, and no compaction triggered since there are only 2 commits INSERT INTO hms_catalog.hudi_hms_db.flink_hudi_mor_tbl VALUES ('id31','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id32','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id33','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id34','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id35','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id36','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id37','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id38','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

-- write to another MOR table, and trigger compactions CREATE TABLE default_catalog.default_database.fake_datasource ( uuid STRING, name STRING, age INT, ts AS PROCTIME(), partition VARCHAR(20) ) WITH ( 'connector' = 'faker', 'rows-per-second' = '2', 'fields.uuid.expression' = '#{numerify ''id####''}', 'fields.name.expression' = '#{superhero.name}', 'fields.age.expression' = '#{number.numberBetween ''20'',''50''}', 'fields.partition.expression' = '#{Options.option ''par1'',''par2'',''par3'',''par4'')}', 'fields.ts.expression' = '#{date.past ''45'',''10'',''SECONDS''}' );

INSERT INTO hms_catalog.hudi_hms_db.flink_hudi_mor_streaming_tbl select default_catalog.default_database.fake_datasource;

-- write three times with different uuid INSERT INTO hms_catalog.hudi_hms_db.flink_hudi_cow_tbl VALUES ('id31','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id32','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id33','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id34','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id35','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id36','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id37','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id38','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

2. Use Spark to show commits

spark-sql> call show_commits(table => 'hudi_hms_db.flink_hudi_mor_tbl'); 20230216160243458 4267 0 4 4 8 8 0 20230216160153391 4260 0 4 4 8 0 0 Time taken: 0.084 seconds, Fetched 2 row(s)

spark-sql> call show_commits(table => 'hudi_hms_db.flink_hudi_cow_tbl'); 20230219145900116 1741955 0 4 4 24 0 0 20230216154007116 1741814 0 4 4 16 0 0 20230216154001168 1741175 4 0 4 8 0 0 Time taken: 0.44 seconds, Fetched 3 row(s)

spark-sql> call show_commits(table => 'hudi_hms_db.flink_hudi_mor_streaming_tbl'); 20230216222718023 16248 0 4 4 60 60 0 20230216222648287 16137 0 4 4 60 60 0 20230216222617980 16256 0 4 4 60 60 0 20230216222548075 16363 0 4 4 60 60 0 20230216222548025 1917272 0 4 4 8163 71 0 20230216222518018 16266 0 4 4 60 60 0 20230216222448002 16384 0 4 4 60 60 0 20230216222418042 16273 0 4 4 60 60 0 20230216222347982 16307 0 4 4 60 60 0 20230216222318041 16275 0 4 4 60 60 0 Time taken: 0.533 seconds, Fetched 10 row(s)

3. **Problem 1: get empty result about query all records via Hive, but count aggregation works.**

0: jdbc:hive2://xxxx-1:10000/> set hive.vectorized.execution.enabled=false; 0: jdbc:hive2://xxxx-1:10000/> select count() from flink_hudi_mor_tbl_rt; +------+ | _c0 | +------+ | 16 | +------+ 1 row selected (8.158 seconds) 0: jdbc:hive2://xxxx-1:10000/> select from flink_hudi_mor_tbl_rt; +--------------------------------------------+---------------------------------------------+-------------------------------------------+-----------------------------------------------+------------------------------------------+-----------------------------+-----------------------------+----------------------------+---------------------------+----------------------------------+ | flink_hudi_mor_tbl_rt._hoodie_commit_time | flink_hudi_mor_tbl_rt._hoodie_commit_seqno | flink_hudi_mor_tbl_rt._hoodie_record_key | flink_hudi_mor_tbl_rt._hoodie_partition_path | flink_hudi_mor_tbl_rt._hoodie_file_name | flink_hudi_mor_tbl_rt.uuid | flink_hudi_mor_tbl_rt.name | flink_hudi_mor_tbl_rt.age | flink_hudi_mor_tbl_rt.ts | flink_hudi_mor_tbl_rt.partition | +--------------------------------------------+---------------------------------------------+-------------------------------------------+-----------------------------------------------+------------------------------------------+-----------------------------+-----------------------------+----------------------------+---------------------------+----------------------------------+ +--------------------------------------------+---------------------------------------------+-------------------------------------------+-----------------------------------------------+------------------------------------------+-----------------------------+-----------------------------+----------------------------+---------------------------+----------------------------------+ No rows selected (0.143 seconds)

4. Query incremental queries based on COW table, it works, but seems that don't need to add `_hoodie_commit_time` filter condition compare to https://github.com/apache/hudi/blob/master/docker/demo/hive-incremental-cow.commands.

-- count all records 0: jdbc:hive2://xxxx-1:10000/> select count(*) from hudi_hms_db.flink_hudi_cow_tbl; +------+ | _c0 | +------+ | 24 | +------+ 1 row selected (12.162 seconds)

set hoodie.flink_hudi_cow_tbl.consume.mode=INCREMENTAL; set hoodie.flink_hudi_cow_tbl.consume.max.commits=1; -- first commit time set hoodie.flink_hudi_cow_tbl.consume.start.timestamp='20230216154001168';

0: jdbc:hive2://xxxxx-1:10000/> select count(*) from hudi_hms_db.flink_hudi_cow_tbl; +------+ | _c0 | +------+ | 8 | +------+ 1 row selected (4.141 seconds)

set hoodie.flink_hudi_cow_tbl.consume.max.commits=2; 0: jdbc:hive2://xxxx-1:10000/> select count(*) from hudi_hms_db.flink_hudi_cow_tbl; +------+ | _c0 | +------+ | 16 | +------+ 1 row selected (4.141 seconds)

6. **Problem 2: query incremental queries based on MOR table seems that not working**

-- disable vectorized execution 0: jdbc:hive2://xxxx-1:10000/> set hive.vectorized.execution.enabled=false;

-- there are 8163 records in base file, and total 8214 records in whole table 0: jdbc:hive2://emr-master-1:10000/> select count() from flink_hudi_mor_streaming_tbl_ro; +-------+ | _c0 | +-------+ | 8163 | +-------+ 0: jdbc:hive2://emr-master-1:10000/> select count() from flink_hudi_mor_streaming_tbl_rt; +-------+ | _c0 | +-------+ | 8214 | +-------+

set hoodie.flink_hudi_mor_streaming_tbl_ro.consume.mode=INCREMENTAL; set hoodie.flink_hudi_mor_streaming_tbl_ro.consume.max.commits=1;

-- set with first commit timestamp set hoodie.flink_hudi_mor_streaming_tbl_ro.consume.start.timestamp='20230216222318041';

-- [PROBLEM 2-1] it seems that count all commits instead of just one commit if no any filter condition 0: jdbc:hive2://xxxx-1:10000/> select count(*) from flink_hudi_mor_streaming_tbl_ro; +-------+ | _c0 | +-------+ | 8163 | +-------+ 1 row selected (14.164 seconds)

-- it works if there is a filter condition about _hoodie_commit_time, but seems that COW table don't need. 0: jdbc:hive2://xxxxx-1:10000/> select count(*) from flink_hudi_mor_streaming_tbl_ro where _hoodie_commit_time > '20230216222318041'; +------+ | _c0 | +------+ | 237 | +------+ 1 row selected (6.162 seconds)

-- change the max commit from 1 to 2 set hoodie.flink_hudi_mor_streaming_tbl_ro.consume.max.commits=2;

-- [Problem 2-2] there is no changes even through the max commits is changed 0: jdbc:hive2://xxxxx-1:10000/> select count(*) from flink_hudi_mor_streaming_tbl_ro where _hoodie_commit_time > '20230216222318041'; +------+ | _c0 | +------+ | 237 | +------+ 1 row selected (6.162 seconds)

0: jdbc:hive2://xxxx-1:10000/> select count(*) from flink_hudi_mor_streaming_tbl_ro; +-------+ | _c0 | +-------+ | 8163 | +-------+ 1 row selected (14.164 seconds)

-- Try to query incremental queries based on RT table. set hoodie.flink_hudi_mor_streaming_tbl_rt.consume.mode=INCREMENTAL; set hoodie.flink_hudi_mor_streaming_tbl_rt.consume.max.commits=1; set hoodie.flink_hudi_mor_streaming_tbl_rt.consume.start.timestamp='20230216222318041';

0: jdbc:hive2://xxxx-1:10000/> select count(*) from flink_hudi_mor_streaming_tbl_rt; +-------+ | _c0 | +-------+ | 8214 | +-------+ 1 row selected (2.138 seconds)

0: jdbc:hive2://xxxx-1:10000/> select count(*) from flink_hudi_mor_streaming_tbl_rt where _hoodie_commit_time > '20230216222318041'; +------+ | _c0 | +------+ | 473 | +------+ 1 row selected (8.193 seconds)

-- set the max commits from 1 to 2 set hoodie.flink_hudi_mor_streaming_tbl_rt.consume.max.commits=2; 0: jdbc:hive2://xxxx-1:10000/> select count() from flink_hudi_mor_streaming_tbl_rt; +-------+ | _c0 | +-------+ | 8214 | +-------+ 1 row selected (6.146 seconds) 0: jdbc:hive2://xxxxx-1:10000/> select count() from flink_hudi_mor_streaming_tbl_rt where _hoodie_commit_time > '20230216222318041'; +------+ | _c0 | +------+ | 473 | +------+ 1 row selected (2.162 seconds)


7. 

**Expected behavior**

A clear and concise description of what you expected to happen.

**Environment Description**

* Hudi version :
0.12.2
* Spark version :
3.2.1
* Hive version :
3.1.2
* Hadoop version :
3.3.4
* Storage (HDFS/S3/GCS..) :
HDFS
* Running on Docker? (yes/no) :
No

**Additional context**

Add any other context about the problem here.

**Stacktrace**

```Add the stacktrace of the error.```
stayrascal commented 1 year ago

So regarding using Hive to query incremental queries of COW & MOR table, we have to add _hoodie_commit_time filter condition on MOR, but COW doesn't need, right?

stayrascal commented 1 year ago

And for Trino case, it cannot count all records base on RT table.

trino> select count(*) from hive.hudi_hms_db.flink_hudi_mor_streaming_tbl_rt;
 _col0
-------
  8163
(1 row)

Query 20230224_100730_00008_sxxri, FINISHED, 2 nodes
Splits: 21 total, 21 done (100.00%)
0.65 [8.16K rows, 1.72MB] [12.6K rows/s, 2.66MB/s]

trino> select count(*) from hive.hudi_hms_db.flink_hudi_mor_streaming_tbl_ro;
 _col0
-------
  8163
(1 row)

Query 20230224_100735_00009_sxxri, FINISHED, 2 nodes
Splits: 21 total, 21 done (100.00%)
0.61 [8.16K rows, 1.72MB] [13.3K rows/s, 2.8MB/s]

trino> select count(*) from hive.hudi_hms_db.flink_hudi_mor_streaming_tbl;
 _col0
-------
     0
(1 row)

Query 20230224_100738_00010_sxxri, FINISHED, 2 nodes
Splits: 18 total, 18 done (100.00%)
0.56 [0 rows, 0B] [0 rows/s, 0B/s]

And if the MOR table haven't done any compaction, query on RT table will throw a exception that the base file not exist, is an expected behavior?

trino> select * from hive.hudi_hms_db.flink_hudi_mor_tbl_rt;

Query 20230224_100913_00011_sxxri, FAILED, 2 nodes
Splits: 4 total, 0 done (0.00%)
0.51 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20230224_100913_00011_sxxri failed: Not valid Parquet file: hdfs://xxxxxxx/hive/hudi_hms_db/flink_hudi_mor_tbl/par3/.83b4db58-a84b-40b5-b38d-d79acfa8db3c_20230216160153391.log.1_0-1-0 expected magic number: PAR1 got: #
danny0405 commented 1 year ago

The support for Trino MOR table type would be emerged in release 0.14.0, for Hive3 query, did you modify some code in Hive so support that, here is a document about Hive & Flink: https://www.yuque.com/docs/share/879349ce-7de4-4284-9126-9c2a3c93a91d?#%20%E3%80%8AHive%20On%20Hudi%E3%80%8B

codope commented 1 year ago

Currently, snapshot query using Trino is only supported for COW tables. For MOR tables, you can only do read optimized queries. Support matrix - https://hudi.apache.org/docs/querying_data#support-matrix We are working on adding MoR snapshot query support https://github.com/trinodb/trino/pull/14786