Closed koozel closed 1 year ago
here is my sql script
-- source1 DROP TABLE IF EXISTS `dwd_status_log`; CREATE TABLE `dwd_status_log` ( mechine STRING, timenow STRING, `value` STRING, pre_value STRING, PRIMARY KEY (timenow) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dwd_status_log', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json' ); -- source2 DROP TABLE IF EXISTS dim_equipment_info; CREATE TABLE IF NOT EXISTS dim_equipment_info ( id STRING ,station_type STRING ,equipment_no STRING ,emp_name STRING ,emp_phone STRING ,vendor STRING ,site STRING ,`floor` STRING ,line STRING ,model STRING ,stage STRING ,id_value STRING ,emp_info STRING ,PRIMARY KEY ( id ) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:4000/data_warehouse_p_10046?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true', 'table-name' = 'dim_equipment_info', 'username' = 'root', 'password' = 'xxx' ); -- sink DROP TABLE IF EXISTS dws_alarm_status_count; CREATE TABLE IF NOT EXISTS dws_alarm_status_count ( mechine_id STRING ,mechine_name STRING ,trigger_date STRING ,alarm_status_count BIGINT ,PRIMARY KEY ( mechine_id,trigger_date ) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dws_alarm_status_count', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json' ); -- trans INSERT INTO dws_alarm_status_count SELECT dsl.mechine AS mechine_id, dei.equipment_no AS mechine_name, DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd') AS `trigger_date`, COUNT(*) AS alarm_status_count FROM dwd_status_log dsl LEFT JOIN dim_equipment_info dei ON dsl.mechine = dei.id WHERE dsl.`value` = '4' GROUP BY DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd'), dsl.mechine,dei.equipment_no;
it missed the last column lineage data.
here is my sql script
it missed the last column lineage data.