Closed SinyoWong closed 2 years ago
以下两种情况导致计算血缘失效: 1.建表语句中存在水位线字段; 2.在SQL中使用了UDTF函数;
1 .表中即使有水位线字段,其他字段的血缘关系也应该要能被正常解析;
-- 建表语句中存在水位线字段,无法解析字段血缘的情况 CREATE TABLE ods_t_item_wms_bak ( id bigint ,commodity_code String ,commodity_name String , item_attribute_template_code String ,track_serial_num int , sn_template_code String ,over_shelf_life_sts String ,min_shelf_life_days int , in_shelf_life_sts String ,min_shelf_life int ,min_shelf_life_sts String , expiring_days int ,warn_shelf_life_sts String ,allow_rotate int ,is_gift int , allow_down int ,over_receiving_ratio decimal(15,0) ,cycle_count int ,count_period int , last_cycle_count_date Timestamp ,cs_code_manager int ,iqc_rule_code String , company_code String ,create_by String ,create_time Timestamp ,updated_by String , updated_time Timestamp ,version int ,commodity_type String ,allow_fold int ,area_type String , abc_class String,primary key(id) not enforced ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test_new', 'table-name' = 't_item_wms_new' ); CREATE TABLE ods_t_item_wms_bak2 ( id bigint ,commodity_code String ,commodity_name String , item_attribute_template_code String ,track_serial_num int , sn_template_code String ,over_shelf_life_sts String ,min_shelf_life_days int , in_shelf_life_sts String ,min_shelf_life int ,min_shelf_life_sts String , expiring_days int ,warn_shelf_life_sts String ,allow_rotate int ,expiring_days0 int , allow_down int ,over_receiving_ratio decimal(15,0) ,cycle_count int ,count_period int , last_cycle_count_date Timestamp ,cs_code_manager int ,iqc_rule_code String , company_code String ,create_by String ,create_time Timestamp ,updated_by String , updated_time Timestamp(3) ,version int ,commodity_type String ,allow_fold int ,area_type String , abc_class String,WATERMARK FOR updated_time AS updated_time,primary key(id) not enforced ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test_new', 'table-name' = 't_item_wms_new' ); CREATE TABLE ods_t_item_wms_bak3 ( id bigint ,commodity_code String ,commodity_name String , item_attribute_template_code String ,track_serial_num int , sn_template_code String ,over_shelf_life_sts String ,min_shelf_life_days int , in_shelf_life_sts String ,min_shelf_life int ,min_shelf_life_sts String , expiring_days int ,warn_shelf_life_sts String ,allow_rotate int ,is_gift int , allow_down int ,over_receiving_ratio decimal(15,0) ,cycle_count int ,count_period int , last_cycle_count_date Timestamp ,cs_code_manager int ,iqc_rule_code String , company_code String ,create_by String ,create_time Timestamp ,updated_by String , updated_time Timestamp ,version int ,commodity_type String ,allow_fold int ,area_type String , abc_class String,primary key(id) not enforced ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test_new', 'table-name' = 't_item_wms_new' ); CREATE TABLE ods_t_item_wms_doris ( id bigint ,commodity_code varchar(512) , commodity_name varchar(512) ,item_attribute_template_code varchar(512) , track_serial_num int ,sn_template_code varchar(512) ,over_shelf_life_sts varchar(512) , min_shelf_life_days int ,in_shelf_life_sts varchar(512) ,min_shelf_life int , min_shelf_life_sts varchar(512) ,expiring_days int ,warn_shelf_life_sts varchar(512) , allow_rotate int ,is_gift int ,allow_down int ,over_receiving_ratio decimal(15,0) , cycle_count int ,count_period int ,last_cycle_count_date Timestamp ,cs_code_manager int , iqc_rule_code varchar(512) ,company_code varchar(512) ,create_by varchar(512) ,create_time Timestamp , updated_by varchar(512) ,updated_time Timestamp ,version int ,commodity_type varchar(512) , allow_fold int ,area_type varchar(512) ,abc_class varchar(512),primary key(id) not enforced) WITH ('connector' = 'doris', 'fenodes'='127.0.0.1:8045', 'table.identifier' = 'test.ods_t_item_wms_doris', 'username' = 'root','password' = '', 'sink.properties.format' = 'json', 'sink.properties.strip_outer_array' = 'true', 'sink.enable-delete' = 'true');
insert into ods_t_item_wms_doris select a.id, a.commodity_code, a.commodity_name, a.item_attribute_template_code, a.track_serial_num, a.sn_template_code, a.over_shelf_life_sts, a.min_shelf_life_days, a.in_shelf_life_sts, a.min_shelf_life, a.min_shelf_life_sts, a.expiring_days+b.expiring_days9+a.cycle_count+c.count_perioda.expiring_days as expiring_days, a.warn_shelf_life_sts, a.allow_rotate, a.is_gift, b.allow_down, b.over_receiving_ratio, a.cycle_count+b.cycle_count, b.count_period, b.last_cycle_count_date, b.cs_code_manager, b.iqc_rule_code, b.company_code, b.create_by, b.create_time, b.updated_by, b.updated_time, b.version, b.commodity_type, b.allow_fold, b.area_type, b.abc_class from ods_t_item_wms_bak as a join ods_t_item_wms_bak2 as b on a.id = b.id join ods_t_item_wms_bak3 as c on a.id = c.id
RelMetadataQuery 这个类找不到解析watermark的方法
0.6.7
如果是Flink1.14的话在系统配置开启逻辑计划分析血缘,不是的话请试用 dev
晚上好,目前Flink版本是1.14.5,开启了逻辑计划分析血缘,使用的也是dev分支源码,但遇到的这个bug。
PS. 并不是解析报错,而是解析不到对应的表,使用UDTF函数则不会返回任何结果。
收到
https://github.com/HamaWhiteGG/flink-sql-lineage/tree/release-2.0.0
Search before asking
What happened
以下两种情况导致计算血缘失效: 1.建表语句中存在水位线字段; 2.在SQL中使用了UDTF函数;
What you expected to happen
1 .表中即使有水位线字段,其他字段的血缘关系也应该要能被正常解析;
How to reproduce
-- 建表语句中存在水位线字段,无法解析字段血缘的情况 CREATE TABLE ods_t_item_wms_bak ( id bigint ,commodity_code String ,commodity_name String , item_attribute_template_code String ,track_serial_num int , sn_template_code String ,over_shelf_life_sts String ,min_shelf_life_days int , in_shelf_life_sts String ,min_shelf_life int ,min_shelf_life_sts String , expiring_days int ,warn_shelf_life_sts String ,allow_rotate int ,is_gift int , allow_down int ,over_receiving_ratio decimal(15,0) ,cycle_count int ,count_period int , last_cycle_count_date Timestamp ,cs_code_manager int ,iqc_rule_code String , company_code String ,create_by String ,create_time Timestamp ,updated_by String , updated_time Timestamp ,version int ,commodity_type String ,allow_fold int ,area_type String , abc_class String,primary key(id) not enforced ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test_new', 'table-name' = 't_item_wms_new' ); CREATE TABLE ods_t_item_wms_bak2 ( id bigint ,commodity_code String ,commodity_name String , item_attribute_template_code String ,track_serial_num int , sn_template_code String ,over_shelf_life_sts String ,min_shelf_life_days int , in_shelf_life_sts String ,min_shelf_life int ,min_shelf_life_sts String , expiring_days int ,warn_shelf_life_sts String ,allow_rotate int ,expiring_days0 int , allow_down int ,over_receiving_ratio decimal(15,0) ,cycle_count int ,count_period int , last_cycle_count_date Timestamp ,cs_code_manager int ,iqc_rule_code String , company_code String ,create_by String ,create_time Timestamp ,updated_by String , updated_time Timestamp(3) ,version int ,commodity_type String ,allow_fold int ,area_type String , abc_class String,WATERMARK FOR updated_time AS updated_time,primary key(id) not enforced ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'test_new', 'table-name' = 't_item_wms_new' ); CREATE TABLE ods_t_item_wms_bak3 ( id bigint ,commodity_code String ,commodity_name String , item_attribute_template_code String ,track_serial_num int , sn_template_code String ,over_shelf_life_sts String ,min_shelf_life_days int , in_shelf_life_sts String ,min_shelf_life int ,min_shelf_life_sts String , expiring_days int ,warn_shelf_life_sts String ,allow_rotate int ,is_gift int , allow_down int ,over_receiving_ratio decimal(15,0) ,cycle_count int ,count_period int , last_cycle_count_date Timestamp ,cs_code_manager int ,iqc_rule_code String , company_code String ,create_by String ,create_time Timestamp ,updated_by String , updated_time Timestamp ,version int ,commodity_type String ,allow_fold int ,area_type String , abc_class String,primary key(id) not enforced ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test_new', 'table-name' = 't_item_wms_new' ); CREATE TABLE ods_t_item_wms_doris ( id bigint ,commodity_code varchar(512) , commodity_name varchar(512) ,item_attribute_template_code varchar(512) , track_serial_num int ,sn_template_code varchar(512) ,over_shelf_life_sts varchar(512) , min_shelf_life_days int ,in_shelf_life_sts varchar(512) ,min_shelf_life int , min_shelf_life_sts varchar(512) ,expiring_days int ,warn_shelf_life_sts varchar(512) , allow_rotate int ,is_gift int ,allow_down int ,over_receiving_ratio decimal(15,0) , cycle_count int ,count_period int ,last_cycle_count_date Timestamp ,cs_code_manager int , iqc_rule_code varchar(512) ,company_code varchar(512) ,create_by varchar(512) ,create_time Timestamp , updated_by varchar(512) ,updated_time Timestamp ,version int ,commodity_type varchar(512) , allow_fold int ,area_type varchar(512) ,abc_class varchar(512),primary key(id) not enforced)
WITH ('connector' = 'doris',
'fenodes'='127.0.0.1:8045', 'table.identifier' = 'test.ods_t_item_wms_doris', 'username' = 'root','password' = '', 'sink.properties.format' = 'json', 'sink.properties.strip_outer_array' = 'true', 'sink.enable-delete' = 'true');
insert into ods_t_item_wms_doris select a.id, a.commodity_code, a.commodity_name, a.item_attribute_template_code, a.track_serial_num, a.sn_template_code, a.over_shelf_life_sts, a.min_shelf_life_days, a.in_shelf_life_sts, a.min_shelf_life, a.min_shelf_life_sts, a.expiring_days+b.expiring_days9+a.cycle_count+c.count_perioda.expiring_days as expiring_days, a.warn_shelf_life_sts, a.allow_rotate, a.is_gift, b.allow_down, b.over_receiving_ratio, a.cycle_count+b.cycle_count, b.count_period, b.last_cycle_count_date, b.cs_code_manager, b.iqc_rule_code, b.company_code, b.create_by, b.create_time, b.updated_by, b.updated_time, b.version, b.commodity_type, b.allow_fold, b.area_type, b.abc_class from ods_t_item_wms_bak as a join ods_t_item_wms_bak2 as b on a.id = b.id join ods_t_item_wms_bak3 as c on a.id = c.id
Anything else
RelMetadataQuery 这个类找不到解析watermark的方法
Version
0.6.7
Are you willing to submit PR?
Code of Conduct