apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.24k stars 2.39k forks source link

[SUPPORT] We are getting Parquet not found error while reading a Hudi table from Flink. #10700

Closed fanfanAlice closed 4 months ago

fanfanAlice commented 5 months ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. task1: flink reads kafka data and writes it to the hudi table sync hive, table1
  2. task2:flink reads kafka data and writes it to the hudi table sync hive, table2
  3. a few days later,flink write table1 task failed
  4. I use flink to write data from table2 into table1
  5. flink task exception:Caused by: java.io.FileNotFoundException: File does not exist: hdfs://admin-stage/user/tempuser/hudi/hudipath/tbale_name/343f7bec-e29d-4b1e-a429-463c8efb09fb-0_91-23390-2236222_20200921075346.parquet

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error. I found a similar issue:https://github.com/apache/hudi/issues/2098 But I don't understand the specific solution to this problem. Task1 and task2 run until Task1 fails. I don't know how to set up the configuration so that table2 data is written to table1 without errors when task1 fails

fanfanAlice commented 5 months ago

image

ad1happy2go commented 5 months ago

@fanfanAlice I hope table1 and table2 created on different location. If yes, the path which is giving exception belongs to table 1 or table 2?

Can you provide a code snippet to reproduce this issue.

danny0405 commented 5 months ago

can you make sure the table1 and table2 have successfull commits.

fanfanAlice commented 5 months ago

flink sql task1: read kafka topic data join hbase table insert to hudi table table1. kafka topic only keeps data for three hours. There are also hundreds of millions of kafka data and hbase data every day, and often tasks fail due to resource issues. Because the banking system task fails and cannot be restarted immediately, the task must be restarted during the production window. By the time the window production period is reached, kafka topic data may have been lost. So there's another flink task that backs up kafka topic's data in real time. this is flink sql task2. When task1 fails, read the task2 hudi table data join hbase table to be reinserted to the hudi table written by task1. this is flink sql task3

CREATE TABLE hbase_mb6_page_bus(rowkey string,
 info row<
 bus                   string
,bdha_hd_tx_time          string
,hd_tx_date               string>,
 PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb6_page_bus',
'zookeeper.quorum'='xxxx:24002,xxxx:24002,xxxx:24002',
'properties.hbase.security.authentication'='kerberos', 
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);
CREATE TABLE hbase_mb_videomanage(rowkey string,
 info row<
 bvm_videoname            string
,bvm_videourl             string
,bvm_videolength          string
,bvm_starttime            string
,bvm_endtime              string
,bvm_videotitle           string
,bvm_video                string
,bvm_imgurl               string
,bvm_channel              string
,bvm_watchnum             bigint
,bvm_messageurl           string
,bvm_belongcolumn         string
,bvm_keyword              string
,bvm_isallowinteract      string
,bvm_isallowbulletscreem  string
,bvm_isallowshare         string
,bvm_watchcrowd           string
,bvm_out                  string
,bvm_recommend            string
,bvm_sharetitle           string
,bvm_sharecontent         string
,bvm_correlationvideo     string
,bvm_comperenumber        string
,bvm_recommendorder       bigint
,bvm_video_label          bigint
,bvm_org_code             string
,bvm_channle_id           string
,bvm_hot_flag             string
,bvm_operator_id          string
,bvm_video_flag           string
,bvm_recommend_time       string
,bvm_imgurl6              string
,bvm_configtype           string
,bvm_modify_backimage     string
,bvm_backimage            string
,bvm_isposition           string
,bvm_videourl_new         string
,bvm_messageurl_new       string
,bdha_hd_tx_time          string
,hd_tx_date               string >,
 PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb_videomanage',
'zookeeper.quorum'='xxxxx:24002,xxxxx:24002,xxxxx:24002',
'properties.hbase.security.authentication'='kerberos', 
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);
create table cabs_task_mgm_m2 (
id string,
shareId string,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
 procTime as PROCTIME()
) WITH (
 'connector' = 'kafka-token',
 'topic' = 'CABS-TASK-MGM-M2',
 'properties.topic' = 'CABS-TASK-MGM-M2',
 'properties.rdeg.console' = 'rdeg-chn-slb-pj-1.bocomm.com:8089',
 'properties.rdeg.location' = 'zj',
 'properties.rdeg.token' = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzeXN0ZW1OYW1lIjoiREJCSSIsInJ1bkVudiI6InByZCIsIm5vd1RpbWUiOjE2ODQ5Nzk3MDQ3NTh9.De5kUMfRtCRgNPa3SsjJoHpufTQ9uHSN0c9y2DCekK8',
 'properties.group.id' = 'g1',
 'scan.startup.mode' = 'latest-offset', --latest-offset/earliest-offset
 'format' = 'json',
 'properties.flink.partition-discovery.interval-millis' = '1000',
 'properties.zookeeper.sasl.client' = 'false',
 'properties.zookeeper.sasl.clientconfig' = 'false'
 );

--select * from kafka_rcsp_label_table;

create table streaming_rcsp_label(
id string  PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
 procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string,
bvm_org_code string,
bus string
)
with (
 'connector' = 'hudi',
 'path' = 'hdfs://hacluster/user/hive/warehouse/kylinstreamdb.db/streaming_rcsp_label',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
 'hoodie.parquet.max.file.size' = '268435456',
 'write.tasks' = '4',
 'write.bucket_assign.tasks'='1',
 'write.task.max.size'='1024',
 'write.rate.limit'='30000',
 --'hoodie.datasource.write.precombine.field'='ts',
 'table.type'='MERGE_ON_READ',
 'changelog.enable'='true',
  'write.operation'='INSERT',
 'compaction.tasks'='1',
 'compaction.delta_commits'='5',
 'compaction.max_memory'='500',
 'hive_sync.enable'='true',
 'hive_sync.mode'='hms',
 'hive_sync.table'='streaming_rcsp_label',
 'hive_sync.db'='kylinstreamdb',
 'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:21088,thrift://xxxxx:21088',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
 );
insert into streaming_rcsp_label(
 id                    
,shareId
,sharerUserId
,sharerEcifId
,sharedUserId
,sharedEcifId
,bindTime
,bindStatus
,finishTime
,procTime
,hd_tx_date_h
,hd_tx_date
,bvm_org_code
,bus
,sysdt
)
select 
t1.id,
t1.shareId ,
t1.sharerUserId ,
t1.sharerEcifId ,
t1.sharedUserId ,
t1.sharedEcifId ,
t1.bindTime ,
t1.bindStatus,
t1.finishTime,
t1.procTime,
t2.bvm_org_code,
t3.bus,
cast(NOW() as string) as sysdt, 
concat(substr(cast(NOW() as string),1, 13), ':00:00') as hd_tx_date_h,
substr(cast(NOW() as string),1, 10) as hd_tx_date
from kafka_rcsp_label_table t1
left join  hbase_mb_videomanage for system_time as of t1.procTime as t2
on t1.videoCode=t2.rowkey
left join hbase_mb6_page_bus for system_time as of t1.procTime as t3
on t1.mbk_usdf_evnt_id=t3.rowkey;

flink sql task2: backup kafka topic to hudi

create table cabs_task_mgm_m2 (
id string,
shareId string,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
 procTime as PROCTIME()
) WITH (
 'connector' = 'kafka-token',
 'topic' = 'CABS-TASK-MGM-M2',
 'properties.topic' = 'CABS-TASK-MGM-M2',
 'properties.rdeg.console' = 'rdeg-chn-slb-pj-1.bocomm.com:8089',
 'properties.rdeg.location' = 'zj',
 'properties.rdeg.token' = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzeXN0ZW1OYW1lIjoiREJCSSIsInJ1bkVudiI6InByZCIsIm5vd1RpbWUiOjE2ODQ5Nzk3MDQ3NTh9.De5kUMfRtCRgNPa3SsjJoHpufTQ9uHSN0c9y2DCekK8',
 'properties.group.id' = 'g1',
 'scan.startup.mode' = 'latest-offset', --latest-offset/earliest-offset
 'format' = 'json',
 'properties.flink.partition-discovery.interval-millis' = '1000',
 'properties.zookeeper.sasl.client' = 'false',
 'properties.zookeeper.sasl.clientconfig' = 'false'
 );
-- The kafka connector authentication method has been modified
create table streaming1_cabs_task_mgm_m2(
id string  PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
 procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string
) with (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/user/kylin/flink/data/streaming1_cabs_task_mgm_m2',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
'hoodie.parquet.max.file.size' = '268435456',
'write.tasks' = '4',
'write.bucket_assign.tasks'='1',
'write.task.max.size'='1024',
'write.rate.limit'='30000',
'hoodie.datasource.write.precombine.field'='ts',
'table.type'='MERGE_ON_READ',
--'index.type'='BUCKET',
'write.operation'='UPSERT',
'changelog.enable'='true',
'compaction.tasks'='1',
'compaction.async.enable'='false',
'compaction.delta_commits'='5',
'compaction.max_memory'='500',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.table'='streaming1_cabs_task_mgm_m2',
'hive_sync.db'='kylinstreamdb',
'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:xxxx,thrift://xxxxxx:xxxx',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
);

insert into streaming1_cabs_task_mgm_m2(
id,
shareId  ,
sharerUserId ,
sharerEcifId ,
sharedUserId ,
sharedEcifId ,
bindTime ,
bindStatus ,
finishTime ,
 procTime ,
 hd_tx_date_h,
 hd_tx_date
)
select 
id ,
shareId  ,
sharerUserId ,
sharerEcifId ,
sharedUserId ,
sharedEcifId ,
bindTime ,
bindStatus ,
finishTime ,
 procTime ,
concat(substr(cast(procTime as string),1, 13), ':00:00') as hd_tx_date_h,
substr(cast(procTime as string),1, 10)   hd_tx_date
from cabs_task_mgm_m2 t1 where id is not null;

flink sql task3: When task1 fails, read the task2 hudi table data join hbase table to be reinserted to the hudi table written by task1

create table streaming1_cabs_task_mgm_m2(
id string  PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
 procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string
) with (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/user/kylin/flink/data/streaming1_cabs_task_mgm_m2',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
'hoodie.parquet.max.file.size' = '268435456',
'write.tasks' = '4',
'write.bucket_assign.tasks'='1',
'write.task.max.size'='1024',
'write.rate.limit'='30000',
'hoodie.datasource.write.precombine.field'='ts',
'table.type'='MERGE_ON_READ',
--'index.type'='BUCKET',
'write.operation'='UPSERT',
'changelog.enable'='true',
'compaction.tasks'='1',
'compaction.async.enable'='false',
'compaction.delta_commits'='5',
'compaction.max_memory'='500',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.table'='streaming1_cabs_task_mgm_m2',
'hive_sync.db'='kylinstreamdb',
'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:xxxx,thrift://xxxxxx:xxxx',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
);

CREATE TABLE hbase_mb6_page_bus(rowkey string,
 info row<
 bus                   string
,bdha_hd_tx_time          string
,hd_tx_date               string
 >,
 PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb6_page_bus',
'zookeeper.quorum'='xxxx:24002,xxxx:24002,xxxx:24002',
'properties.hbase.security.authentication'='kerberos', 
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xxx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);

CREATE TABLE hbase_mb_videomanage(rowkey string,
 info row<
 bvm_videoname            string
,bvm_videourl             string
,bvm_videolength          string
,bvm_starttime            string
,bvm_endtime              string
,bvm_videotitle           string
,bvm_video                string
,bvm_imgurl               string
,bvm_channel              string
,bvm_watchnum             bigint
,bvm_messageurl           string
,bvm_belongcolumn         string
,bvm_keyword              string
,bvm_isallowinteract      string
,bvm_isallowbulletscreem  string
,bvm_isallowshare         string
,bvm_watchcrowd           string
,bvm_out                  string
,bvm_recommend            string
,bvm_sharetitle           string
,bvm_sharecontent         string
,bvm_correlationvideo     string
,bvm_comperenumber        string
,bvm_recommendorder       bigint
,bvm_video_label          bigint
,bvm_org_code             string
,bvm_channle_id           string
,bvm_hot_flag             string
,bvm_operator_id          string
,bvm_video_flag           string
,bvm_recommend_time       string
,bvm_imgurl6              string
,bvm_configtype           string
,bvm_modify_backimage     string
,bvm_backimage            string
,bvm_isposition           string
,bvm_videourl_new         string
,bvm_messageurl_new       string
,bdha_hd_tx_time          string
,hd_tx_date               string
 >,
 PRIMARY KEY(rowkey) NOT ENFORCED
) with ('connector'='hbase-2.2',
'table-name'='mb_videomanage',
'zookeeper.quorum'='xxxxx:24002,xxxxx:24002,xxxxx:24002',
'properties.hbase.security.authentication'='kerberos', 
'properties.hbase.regionserver.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.master.kerberos.principal'='hbase/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'properties.hbase.regionserver.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab',
'properties.hbase.master.keytab.file'='/opt/huawei/Bigdata/FusionInsight_HD_8.1.2.5/install/FusionInsight-HBase-2.2.3/keytabs/HBase/hbase.keytab'
);

create table streaming_rcsp_label(
id string  PRIMARY KEY NOT ENFORCED,
shareId string ,
sharerUserId string,
sharerEcifId string,
sharedUserId string,
sharedEcifId string,
bindTime string,
bindStatus string,
finishTime string,
 procTime TIMESTAMP,
hd_tx_date_h string,
hd_tx_date string,
bvm_org_code string,
bus string
)
with (
 'connector' = 'hudi',
 'path' = 'hdfs://hacluster/user/hive/warehouse/kylinstreamdb.db/streaming_rcsp_label',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'hoodie.datasource.write.partitionpath.field'='hd_tx_date',
 'hoodie.parquet.max.file.size' = '268435456',
 'write.tasks' = '4',
 'write.bucket_assign.tasks'='1',
 'write.task.max.size'='1024',
 'write.rate.limit'='30000',
 --'hoodie.datasource.write.precombine.field'='ts',
 'table.type'='MERGE_ON_READ',
 'changelog.enable'='true',
  'write.operation'='INSERT',
 'compaction.tasks'='1',
 'compaction.delta_commits'='5',
 'compaction.max_memory'='500',
 'hive_sync.enable'='true',
 'hive_sync.mode'='hms',
 'hive_sync.table'='streaming_rcsp_label',
 'hive_sync.db'='kylinstreamdb',
 'hive_sync.support_timestamp'='true',
'hive_sync_metastore.uris'='thrift://xxxxx:21088,thrift://xxxxx:21088',
'hive_sync.conf.dir'='/data1/streaming/flink/hadoop_conf_dir',
'hive_sync.use_kerberos'='true',
'hive_sync.kerberos_principal'='hive/hadoop.hadoop.mrs.xx.write.bocom.com@HADOOP.MRS.XX.WRITE.BOCOM.COM',
'hive_sync.hive_sync.hive_conf'='hdfs://hacluster/user/kylin/flink/write_hadoop_conf/hive-site.xml'
 );

insert into streaming_rcsp_label(
 id                    
,shareId
,sharerUserId
,sharerEcifId
,sharedUserId
,sharedEcifId
,bindTime
,bindStatus
,finishTime
,procTime
,hd_tx_date_h
,hd_tx_date
,bvm_org_code
,bus
,sysdt
)
select 
t1.id,
t1.shareId ,
t1.sharerUserId ,
t1.sharerEcifId ,
t1.sharedUserId ,
t1.sharedEcifId ,
t1.bindTime ,
t1.bindStatus,
t1.finishTime,
t1.procTime,
t2.bvm_org_code,
t3.bus,
cast(NOW() as string) as sysdt, 
concat(substr(cast(NOW() as string),1, 13), ':00:00') as hd_tx_date_h,
substr(cast(NOW() as string),1, 10) as hd_tx_date
from streaming1_cabs_task_mgm_m2 t1
left join  hbase_mb_videomanage for system_time as of t1.procTime as t2
on t1.videoCode=t2.rowkey
left join hbase_mb6_page_bus for system_time as of t1.procTime as t3
on t1.mbk_usdf_evnt_id=t3.rowkey where hd_tx_date >= '2023-12-11' and hd_tx_date <= '2023-12-12';

An error occurred when task3 was executed.

ad1happy2go commented 5 months ago

@fanfanAlice Can you provide the timeline of both the tables when this issue occurred?

stayrascal commented 4 months ago

@fanfanAlice it's a little bit confused that the path of all hudi tables are based on hdfs://hacluster/user/xxx from the sql snippet, but the first FileNotFoundException you mentioned is hdfs://admin-stage/user/tempuser/hudi/hudipath/tbale_name/xxxx, and the screenshot you posted is hdfs://hacluster/user/kylin/flink/data/streaming1_activity_backxxxx.

so the FNFE is thrown during reading streaming1_cabs_task_mgm_m2 table(hdfs://hacluster/user/kylin/flink/data/streaming1_cabs_task_mgm_m2) ?

fanfanAlice commented 4 months ago

The problem cannot be closed again first