itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
347 stars 149 forks source link

丢失数据 #52

Closed shukai-szj closed 8 months ago

shukai-szj commented 1 year ago

insert into select a left join b on ... 得到的数据,丢失了满足on条件的数据,只有不满足条件的数据

itinycheng commented 1 year ago

Flink SQL Join的逻辑不会被下推到Connector; 确认下你SQL是否有误,或日志级别用debug,打印出generated code,分析下具体原因;

shukai-szj commented 1 year ago

create table ods_mall_info ( id BIGINT, name STRING, shortName STRING, mallType TINYINT, phone STRING, provinceCode STRING, cityCode STRING, countyCode STRING, address STRING, longitude Decimal(18,6), latitude Decimal(18,6), state STRING, createTime TIMESTAMP, createBy BIGINT, image STRING, codeImageCreateTime TIMESTAMP, codeImageState STRING, codeImage STRING, documentType STRING, socialCreditCode STRING, licenseEffectiveDate STRING, businessScope STRING, ICPNumber STRING, webAddress STRING, registeredCaptial BIGINT, paidCaptial BIGINT, foundedDate STRING, industry STRING, registeredAddress STRING, businessAddress STRING, socialCreditImage STRING, corporateName STRING, corporatePhone STRING, corporateDocumentType STRING, corporateDocumentNumber STRING, documentStartEndDate STRING, documentFrontImage STRING, documentBackImage STRING, oms_code STRING, coefficient STRING, updateTime TIMESTAMP, updateBy BIGINT, delFlag STRING, codeImageRecharge STRING, mallDistinction STRING, showDefaultPic TINYINT, mall_dis STRING, real_business TINYINT ) WITH ( 'connector' ='clickhouse', 'url' ='clickhouse://:8123', 'database-name' ='hxhdbtest', 'table-name' = 'ods_mall_info', 'username' = 'default', 'password' = '' ); create table ods_business_lymerch_detail ( id BIGINT, mall_guid String, mall_mdmid String, mall_class String, mall_code String, mall_status String, mall_type String, address String, area String, bi_code String, b_reg_code String, b_reg_name String, city_code String, city_name String, consumer_hotline String, country_code String, country_name String, full_name String, gd_gps String, gps String, is_open_up TINYINT, is_up_app TINYINT, is_up_ly TINYINT, is_up_sap TINYINT, oms_code String, open_date String, operation_name String, post_code String, province_code String, prv_name String, ps_code String, short_name String, s_reg_code String, telephone String, composite_merch String, del_flag TINYINT, create_date_time timestamp ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://:8123', 'database-name' = 'hxhdbtest', 'table-name' = 'ods_business_lymerch_detail', 'username' = 'default', 'password' = '' ); CREATE TABLE ods_mall_city ( Region_Identifier string, Chinese_Abbreviation string, Area_Code string, Full_Name_Pinyin string, Full_Name string, Area_Name string, Superior_Area_Code string, Pinyin string, AreaType string ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://:8123', 'database-name' = 'hxhdbtest', 'table-name' = 'ods_mall_city', 'username' = 'default', 'password' = '' ); CREATE TABLE dim_mall ( hxh_mall_id String, hxh_mall_name String, hxh_mall_short_name String, mall_address String, mall_type INT, mall_kind String, mall_area String, open_date String, oms_code String, b_reg_code String, b_reg_name String, bi_code String, ps_code String, province_code String, province_name String, city_code String, city_name String, country_code String, country_name String, consumer_hotline String, mall_longitude Decimal(18, 6), mall_latitude Decimal(18, 6), city_longitude Decimal(18, 6), city_latitude Decimal(18, 6), gd_gps String, tecent_gps String, hxh_mall_status String, mall_industry String, mall_distinction String, ly_mall_id String, ly_mall_guid String, ly_mall_mdmid String, ly_mall_code String, ly_mall_class String, ly_mall_name String, state String, ori_create_time TIMESTAMP(0), ori_update_time TIMESTAMP(0), dim_create_date TIMESTAMP(0), PRIMARY KEY (hxh_mall_id) NOT ENFORCED ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://:8123', 'database-name' = 'hxhdbtest', 'table-name' = 'dim_mall', 'username' = 'default', 'password' = '' );

INSERT INTO dim_mall SELECT cast(mi.id AS VARCHAR), -- 商场代码 mi.name AS hxh_mall_name, -- 商场名称 mi.shortName AS hxh_mall_short_name, -- 商场简称 bld.address AS mall_address, -- 地址 mi.mallType AS mall_type, -- 商场类型 bld.mall_type AS mall_kind, -- 商场类别 bld.area AS mall_area, -- 商场面积 bld.open_date AS open_date, -- 开业日期 mi.oms_code AS oms_cod, -- 龙翼商场关联码 bld.b_reg_code AS b_reg_code, -- 营发中心编号 bld.b_reg_name AS b_reg_name, -- 营发中心名称 bld.bi_code AS bi_code, -- BI代码 bld.ps_code AS ps_code, -- PS代码 mi.provinceCode AS province_code, -- 省代码 oc_province.Area_Name AS province_name, -- 省名称 mi.cityCode AS city_code, -- 城市编码 oc_city.Area_Name AS city_name, -- 城市名称 mi.countyCode AS country_code, -- 市区代码 oc_county.Area_Name AS country_name, -- 市辖区名 bld.consumer_hotline AS consumer_hotline, -- 消费热线 mi.longitude AS longitude, -- 商场经度 mi.latitude AS latitude, 0 AS mall_longitude, 0 AS mall_latitude, -- 商场纬度 bld.gd_gps AS gd_gps, -- 高德地图坐标 bld.gps AS tecent_gps, -- 腾讯地图坐标 mi.state AS hxh_mall_status, -- 商场状态 mi.industry AS mall_industry, -- 所属行业 mi.mallDistinction AS mall_distinction, -- 商场标识 1-显形商场 2-隐形商场 cast(bld.id AS VARCHAR) AS ly_mall_id, -- 龙翼商场代码 bld.mall_guid AS ly_mall_guid, -- 龙翼商场guid bld.mall_mdmid AS ly_mall_mdmid, -- 龙翼商场mdmid bld.mall_code AS ly_mall_code, -- 龙翼商场编号 bld.mall_class AS ly_mall_class, -- 龙翼商场名字 bld.full_name as ly_mall_name, -- 商户进件状态(1:可用) bld.mall_status, mi.createTime AS business_create_date, mi.updateTime AS update_create_date, now() FROM ods_mall_info mi LEFT JOIN ( SELECT * FROM ods_business_lymerch_detail WHERE del_flag = 0 ) bld ON mi.oms_code = bld.oms_code LEFT JOIN ( SELECT Area_Code, Area_Name FROM ods_mall_city WHERE AreaType = '2' ) oc_province ON oc_province.Area_Code = mi.provinceCode LEFT JOIN ( SELECT Area_Code, Area_Name FROM ods_mall_city WHERE AreaType = '3' ) oc_city ON oc_city.Area_Code = mi.cityCode LEFT JOIN ( SELECT Area_Code, Area_Name FROM ods_mall_city WHERE AreaType = '4' ) oc_county ON oc_county.Area_Code = mi.countyCode;

上面的是原始SQL,left join之后的数量确实变少了,ods_mall_info有40条数据,而ods_business_lymerch_detail只有11条数据,最终得到的是29条(除了右表的11条数据)。如果insert表改成 connector='print',能够明显看到有那11条数据的+ - + 的三步操作。

当我在insert表上增加sink.batch-size = ‘1’的时候,数据没丢失

itinycheng commented 1 year ago

可能是delete执行顺序问题:https://github.com/itinycheng/flink-connector-clickhouse/issues/24 ; 不过现在来看这应该是一个在直接写本地或分布式表时的BUG,稍晚些修复;

itinycheng commented 1 year ago

问题已修复,https://github.com/itinycheng/flink-connector-clickhouse/commit/06d0aedb1cb606d85cb328f4bbe6befd0892d4ca