apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[SUPPORT] Flink write to COW Hudi table,hive aggregate query results has duplicate data but select * did not #10486

Closed CamelliaYjli closed 5 months ago

CamelliaYjli commented 6 months ago

Describe the problem you faced

I use Flink write Hudi COW table and sync to hive , but hive aggregate query (eg. count(), row_number() over() )results has duplicate data but select did not.

To Reproduce

Steps to reproduce the behavior:

  1. Flink-SQL write Hudi COW table.

upsert

        String hudiSinkDDL = "CREATE TABLE hudi_table(\n" +
                "id String,\n" +
                "name String,\n" +
                "age Int,\n" +
                "PRIMARY KEY (id) NOT ENFORCED \n" +
                ") WITH (\n" +
                // 基本配置
                "'write.operation' = 'upsert',\n" +
                "'write.precombine' = 'true',\n" +
                "'connector' = 'hudi',\n" +
                "'path'= '${basePath}',\n" +
                "'table.type' = 'COPY_ON_WRITE',\n" +
                "'write.tasks' = '2',\n" +
                "'write.bucket_assign.tasks' = '2',\n" +
                // 同步hive配置
                "'hive_sync.conf.dir'='/opt/apache-hive-3.1.3-bin/conf',\n" +
                "'hive_sync.enabled' = 'true',\n" + // 将数据集注册并同步到 hive metastore
                "'hive_sync.mode' = 'hms',\n" + // 采用 hive metastore 同步
                "'hive_sync.metastore.uris' = 'thrift://localhost:9083',\n" +
                "'hive_sync.db' = 'cdc_hudi',\n" +
                "'hive_sync.table' = '${tableName}',\n" +
                // 小文件&压缩配置
                "'clean.retain_commits' = '1',\n" + 
                "'metadata.compaction.delta_commits' = '5',\n" +
                "'hoodie.parquet.compression.codec' = 'gzip',\n" + 
                "'hoodie.parquet.max.file.size' = '268435456'\n" +
                ")";
  1. insert data into MySQL and update it.
-- insert
insert into table_test_duplicate_1(id,name,age) values('dup_clean_1','Camellia',11);
-- update
update table_test_duplicate_1 set age = 20 where id ='dup_clean_1';
  1. select * from cdc_hudi.table_test_duplicate_1 where id = 'dup_clean_1'; Normal results.
image
  1. execute aggregate function; data duplication.
select count(1) from cdc_hudi.table_test_duplicate_1 where id = 'dup_clean_1';
image
select
*,
row_number() over (partition by id order by age desc) as rank
from cdc_hudi.table_test_duplicate_1 where id = 'dup_clean_1';
image

Expected behavior

Why do aggregated queries and regular queries have inconsistent results?Your help is appreciative.

Environment Description

xicm commented 5 months ago

what's you hive execution engine? do you update the hudi-hadoop-mr-bundle jar in hive.tar.gz or tez.tar.gz on hdfs?

CamelliaYjli commented 5 months ago

what's you hive execution engine? do you update the hudi-hadoop-mr-bundle jar in hive.tar.gz or tez.tar.gz on hdfs?

Sorry for the late reply. I am using Hive-on-MR, and hudi-hadoop-mr-bundle-0.14.0.jar has been added to ${HIVE_HOME}/auxlib.

danny0405 commented 5 months ago

Is the hive table synced automatically from the ingestion job?

CamelliaYjli commented 5 months ago

Is the hive table synced automatically from the ingestion job?

yes, Hive synchronization haven been enabled. image

danny0405 commented 5 months ago

can you show us the create table statement from Hive?

CamelliaYjli commented 5 months ago

can you show us the create table statement from Hive?

Okay, the table in Hive is an external table automatically generated during synchronization. The statement is as follows:

CREATE EXTERNAL TABLE cdc_hudi.table_test_duplicate_1( _hoodie_commit_time string COMMENT '', _hoodie_commit_seqno string COMMENT '', _hoodie_record_key string COMMENT '', _hoodie_partition_path string COMMENT '', _hoodie_file_name string COMMENT '', id string COMMENT '', name string COMMENT '', age int COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='false', 'path'='hdfs://localhost:8020/user/hive/warehouse/cdc_hudi.db/table_test_duplicate_1') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://localhost:8020/user/hive/warehouse/cdc_hudi.db/table_test_duplicate_1' TBLPROPERTIES ( 'last_commit_completion_time_sync'='20240112161204004', 'last_commit_time_sync'='20240112160716028', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}}]}', 'transient_lastDdlTime'='1704939293')

danny0405 commented 5 months ago

Looks good, @xicm can you help confirm this issue?

xicm commented 5 months ago

Seems a bug

CamelliaYjli commented 5 months ago

Seems a bug

When I set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat , result is right , is this necessary before querying?

danny0405 commented 5 months ago

Yeah, you should use HoodieHiveInputFormat or HoodieCombineHiveInputFormat. This is a Chinese doc that you can take a refeerence: https://www.yuque.com/yuzhao-my9fz/kb/kgv2rb

CamelliaYjli commented 5 months ago

Yeah, you should use HoodieHiveInputFormat or HoodieCombineHiveInputFormat. This is a Chinese doc that you can take a refeerence: https://www.yuque.com/yuzhao-my9fz/kb/kgv2rb

OK,thx ~