Closed jiewenk closed 4 months ago
额,我重新把机器扩容了,重复一样的步骤,还是在同样的地方报错,求作者大大指点
MySQL增量同步ES以及MySQL增量同步MySQL,错误如上所示
关键错误信息在这里
org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75)
at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285)
at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113)
at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)
能否提供一下mysql的 create table ddl?
关键错误信息在这里
org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive). at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75) at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285) at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113) at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)
能否提供一下mysql的 create table ddl?
主要是从MySQL到ES全量是成功了的,然后增量就失败了 然后是要源端MySQL的表的ddl吗?
这是MySQL表的ddl语句:
CREATE TABLE `video` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '视频ID',
`device_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '所属设备ID:可以为空',
`user_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '所属用户ID',
`likes` bigint DEFAULT NULL COMMENT '点赞数',
`share_text` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '分享文字',
`video_origin` int NOT NULL COMMENT '视频来源:0-终端上传,1-用户自己上传,2-用户自己上传',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL COMMENT '更新时间',
`url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`comment_total` int NOT NULL DEFAULT '0' COMMENT '评论数量(含二级评论)',
`template_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '所使用的的视频模板',
`video_property` int DEFAULT NULL COMMENT '视频属性:0-素材视频,1-完成剪辑视频',
`share_count` int DEFAULT '0' COMMENT '分享总数',
`download_count` int DEFAULT '0' COMMENT '下载总数',
`collection_count` int DEFAULT '0' COMMENT '收藏总数',
`browse_count` int DEFAULT '0' COMMENT '浏览总数',
`score` int DEFAULT '0' COMMENT '视频分数',
`video_visibility` int NOT NULL COMMENT '视频可见性:0-私有,1-仅粉丝可见,2-公开',
`grounding_status` int DEFAULT NULL COMMENT '上架状态:0-已下架,1-上架中,2-审核中,3-审核不通过',
`recommended_status` int DEFAULT NULL COMMENT '推荐状态:0-false,1-true',
`cancel_reason` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '下架原因',
`self_read` int DEFAULT NULL COMMENT '是否自己已读:0-未读,1-已读',
`generate_time` datetime DEFAULT NULL COMMENT '设备上传视频时间',
`follow` int NOT NULL DEFAULT '0' COMMENT '通过视频达到的关注人数',
`duration` float DEFAULT '60' COMMENT '视频时长',
`completion_rate` float NOT NULL DEFAULT '0' COMMENT '完播率',
`origin_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`cover_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
`water_mark_url` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '水印',
`type` tinyint NOT NULL DEFAULT '0' COMMENT '0:1:',
`ld` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '流畅',
`sd` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '标清',
`hd` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '高清',
`full_hd` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '全高清',
`media_type` tinyint NOT NULL DEFAULT '0' COMMENT '媒体类型0:视频,1:图片,默认视频',
`create_by` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '创建者',
`update_by` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '更新者',
`publish_time` timestamp NULL DEFAULT NULL COMMENT '视频发布到广场的时间',
`width` int DEFAULT NULL,
`height` int DEFAULT NULL,
`download_switch` tinyint(1) DEFAULT '0' COMMENT '0打开下载开关 1关闭下载开关',
`first_frame_md5` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '视频第一帧的md5值',
`pets_id` varchar(320) NOT NULL DEFAULT '' COMMENT '媒体包含的宠物',
`mood_id` int DEFAULT NULL COMMENT '关联心情id',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_likes` (`likes`) USING BTREE,
KEY `idx_user_id` (`user_id`) USING BTREE,
KEY `idx_video_origin` (`video_origin`) USING BTREE,
KEY `video_first_frame_md5_index` (`first_frame_md5`) USING BTREE,
KEY `idx_device_id` (`device_id`) USING BTREE,
KEY `video_visibility` (`video_visibility`) USING BTREE,
KEY `video_origin` (`video_origin`) USING BTREE,
KEY `grounding_status` (`grounding_status`) USING BTREE,
KEY `video_mix_rec_index` (`video_visibility`,`video_origin`,`grounding_status`,`user_id`,`create_time` DESC) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='视频表';
然后是要源端MySQL的表的ddl吗?
是的。另外,elastic 端schema 编辑页面截一个图看一下
切换到 专家模式 ,把对应的 xml格式的 schema 贴一下
专家模式 ,把对应的 xml格式的 schema 贴一下
{ "column": [{ "array": false, "name": "id", "index": true, "store": true, "pk": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "device_id", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "user_id", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "likes", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "analyzer": "standard", "name": "share_text", "index": true, "store": true, "type": "text", "doc_values": false }, { "array": false, "name": "video_origin", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "create_time", "index": true, "store": true, "type": "date", "doc_values": true }, { "array": false, "name": "update_time", "index": true, "store": true, "type": "date", "doc_values": true }, { "array": false, "name": "url", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "comment_total", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "template_id", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "video_property", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "share_count", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "download_count", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "collection_count", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "browse_count", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "video_visibility", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "grounding_status", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "recommended_status", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "cancel_reason", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "self_read", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "generate_time", "index": true, "store": true, "type": "date", "doc_values": true }, { "array": false, "name": "follow", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "duration", "index": true, "store": true, "type": "double", "doc_values": false }, { "array": false, "name": "completion_rate", "index": true, "store": true, "type": "double", "doc_values": false }, { "array": false, "name": "origin_url", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "cover_url", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "water_mark_url", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "type", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "ld", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "sd", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "hd", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "full_hd", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "media_type", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "create_by", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "update_by", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "name": "publish_time", "index": true, "store": true, "type": "date", "doc_values": true }, { "array": false, "name": "width", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "height", "index": true, "store": true, "type": "long", "doc_values": false }, { "array": false, "name": "download_switch", "index": true, "store": true, "type": "boolean", "doc_values": false }, { "array": false, "name": "first_frame_md5", "index": true, "store": true, "type": "keyword", "doc_values": false }, { "array": false, "analyzer": "standard", "name": "pets_id", "index": true, "store": true, "type": "text", "doc_values": false }, { "array": false, "name": "mood_id", "index": true, "store": true, "type": "long", "doc_values": false }] }
TIS默认生成的
{ "column":[ { "array":false, "name":"id", "index":true, "store":true, "pk":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"device_id", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"user_id", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"likes", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"share_text", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"video_origin", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"create_time", "index":true, "store":true, "type":"date", "doc_values":false }, { "array":false, "name":"update_time", "index":true, "store":true, "type":"date", "doc_values":false }, { "array":false, "name":"url", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"comment_total", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"template_id", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"video_property", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"share_count", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"download_count", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"collection_count", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"browse_count", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"score", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"video_visibility", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"grounding_status", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"recommended_status", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"cancel_reason", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"self_read", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"generate_time", "index":true, "store":true, "type":"date", "doc_values":false }, { "array":false, "name":"follow", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"duration", "index":true, "store":true, "type":"double", "doc_values":false }, { "array":false, "name":"completion_rate", "index":true, "store":true, "type":"double", "doc_values":false }, { "array":false, "name":"origin_url", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"cover_url", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"water_mark_url", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"type", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"ld", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"sd", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"hd", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"full_hd", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"media_type", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"create_by", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"update_by", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"publish_time", "index":true, "store":true, "type":"date", "doc_values":false }, { "array":false, "name":"width", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"height", "index":true, "store":true, "type":"long", "doc_values":false }, { "array":false, "name":"download_switch", "index":true, "store":true, "type":"boolean", "doc_values":false }, { "array":false, "name":"first_frame_md5", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"pets_id", "index":true, "store":true, "type":"keyword", "doc_values":false }, { "array":false, "name":"mood_id", "index":true, "store":true, "type":"long", "doc_values":false } ] }
ok,我查一下
由于设置text类型的类型,内部对应jdbc type为 JDBCTypes.LONGVARCHAR
没有设置colSize
new DataTypeMeta(new DataType(JDBCTypes.LONGVARCHAR))
所以在生成flink任务时会报异常
org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75)
at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285)
at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113)
at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)
只需添加一个Integer.MAX_VALUE参数即可:
new DataTypeMeta(new DataType(JDBCTypes.LONGVARCHAR, Integer.MAX_VALUE))
@jiewenk 有微信联系方式不,发你一个jar替换一下即可
有微信联系方式不,发你一个jar替换一下即可
18482162262,感谢!
成功创建MySQL到ES的增量通道后,如果MySQL的字段值允许为空值,在增量同步时,会报空指针 部分异常堆栈如下:
Caused by: java.lang.NullPointerException
at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$IntGetter.getObject(RowFieldGetterFactory.java:226)
at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getVal(RowFieldGetterFactory.java:246)
at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getFieldOrNull(RowFieldGetterFactory.java:241)
at com.qlangtech.plugins.incr.flink.cdc.FlinkCol.getRowDataVal(FlinkCol.java:65)
at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory$DefaultElasticsearchSinkFunction.createIndexRequest(ElasticSearchSinkFactory.java:270)
at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory$DefaultElasticsearchSinkFunction.process(ElasticSearchSinkFactory.java:291)
at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory$DefaultElasticsearchSinkFunction.process(ElasticSearchSinkFactory.java:245)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:318)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:61)
at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:47)
at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:32)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:150)
at com.qlangtech.plugins.incr.flink.cdc.TISDeserializationSchema.deserialize(TISDeserializationSchema.java:109)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:128)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:110)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:82)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:55)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
已经修复 tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/cdc/RowFieldGetterFactory.java
private Object getVal(RowData rowData) {
// 添加是否是空的判断即可
if (rowData.isNullAt(this.colIndex)) {
return null;
}
try {
return getObject((GenericRowData) rowData);
} catch (ClassCastException e) {
throw new RuntimeException("colIdx:" + this.colIndex + ",colName:" + this.colName, e);
}
}
在已经创建了一个MySQL到ES的批量构建并执行成功的前提下,继续创建MySQL到ES的一个增量通道,在部署时的第三步报错 异常信息: