apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.81k stars 3.3k forks source link

issue #9572

Closed 19932537671 closed 2 years ago

19932537671 commented 2 years ago

binlog load或者flink-cdc和flink-sql的load到doris问题需求描述: mysql表日期字段为bigint或者timestamp,实时按照DAY动态分区插入到doris中 问题描述: 1.bigint和timestamp转date或者datetime函数太长 2.binlog load动态分区字段不支持根据mysql的衍生字段动态分区插入到doris 3.flink-cdc读取mysql到doris的flinksql中间添加字段也不支持date和datetime的添加

19932537671 commented 2 years ago

mysql 表

CREATE TABLE record ( id bigint NOT NULL AUTO_INCREMENT, user_id bigint unsigned NOT NULL COMMENT '用户ID', device_id varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '设备ID', idfa varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 'IOS IDFA', os varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '设备系统', os_version varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '系统版本', version varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 'APP版本', system tinyint unsigned NOT NULL DEFAULT '0' COMMENT '1IOS 2安卓', platform tinyint unsigned NOT NULL DEFAULT '0' COMMENT '1APP2小程序3H5', event_id bigint unsigned NOT NULL DEFAULT '0' COMMENT '事件ID', log_id bigint unsigned NOT NULL DEFAULT '0' COMMENT '日志自增ID', base_uri varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '当前短路径', event_data varchar(512) DEFAULT NULL COMMENT '关联数据额外数据', created_at bigint unsigned NOT NULL DEFAULT '0', PRIMARY KEY (id) USING BTREE, KEY INDEX_CREATED_AT (created_at) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=481423362 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='埋点记录';

doris表

CREATE TABLE point.record ( id bigint, user_id bigint COMMENT '用户ID', date date, device_id String COMMENT '设备ID', idfa String COMMENT 'IOS IDFA', os String COMMENT '设备系统', os_version String COMMENT '系统版本', version String COMMENT 'APP版本', system tinyint COMMENT '1IOS 2安卓', platform tinyint COMMENT '1APP2小程序3H5', event_id bigint COMMENT '事件ID', log_id bigint COMMENT '日志自增ID', base_uri String COMMENT '当前短路径', event_data String COMMENT '关联数据额外数据', created_at bigint ) UNIQUE KEY(id,user_id,date) PARTITION BY RANGE(date) () DISTRIBUTED BY HASH(user_id) PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "DAY", "dynamic_partition.end" = "3", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "32" );

flink mysql映射表

CREATE TABLE mysql_record ( id bigint, user_id bigint COMMENT '用户ID',

device_id STRING COMMENT '设备ID', idfa STRING COMMENT 'IOS IDFA', os STRING COMMENT '设备系统', os_version STRING COMMENT '系统版本', version STRING COMMENT 'APP版本', system tinyint COMMENT '1IOS 2安卓', platform tinyint COMMENT '1APP2小程序3H5', event_id bigint COMMENT '事件ID', log_id bigint COMMENT '日志自增ID', base_uri STRING COMMENT '当前短路径', event_data STRING COMMENT '关联数据额外数据', created_at bigint , primary key(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'linux008', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'point', 'table-name' = 'record' );

doris映射表

CREATE TABLE doris_record ( id bigint, user_id bigint COMMENT '用户ID', date String, device_id String COMMENT '设备ID', idfa String COMMENT 'IOS IDFA', os String COMMENT '设备系统', os_version String COMMENT '系统版本', version String COMMENT 'APP版本', system tinyint COMMENT '1IOS 2安卓', platform tinyint COMMENT '1APP2小程序3H5', event_id bigint COMMENT '事件ID', log_id bigint COMMENT '日志自增ID', base_uri String COMMENT '当前短路径', event_data String COMMENT '关联数据额外数据', created_at bigint ) WITH ( 'connector' = 'doris', 'fenodes' = 'linux008:8030', 'table.identifier' = 'point.record', 'sink.batch.size' = '2', 'sink.batch.interval'='1', 'username' = 'root', 'password' = '123456' );

插入命令:

insert into doris_record
select
`id`,
`user_id`,
CONCAT(CAST(YEAR(CAST(from_unixtime(CAST(substring( CAST(`created_at` as String) ,1,10) as BIGINT) , '%Y-%m-%d') as datetime)) AS STRING), '-', CAST(MONTH(CAST(from_unixtime(CAST(substring( CAST(`created_at` as String) ,1,10) as BIGINT) , '%Y-%m-%d') as datetime)) AS STRING), '-', CAST(DAYOFMONTH(CAST(from_unixtime(CAST(substring( CAST(`created_at` as String) ,1,10) as BIGINT) , '%Y-%m-%d') as datetime)) AS STRING)) as `date`,
`device_id`,
`idfa`,
`os`,
`os_version`,
`version`,
`system`,
`platform`,
`event_id`,
`log_id`,
`base_uri`,
`event_data`,
`created_at`
from mysql_record;