datavane / tis

Support agile DataOps Based on Flink, DataX and Flink-CDC, Chunjun with Web-UI
https://tis.pub
Apache License 2.0
1.04k stars 221 forks source link

flink同步报错 #378

Closed shizenghua closed 3 weeks ago

shizenghua commented 1 month ago
Caused by: java.lang.RuntimeException: colIdx:1,colName:id
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getVal(RowFieldGetterFactory.java:281)
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getFieldOrNull(RowFieldGetterFactory.java:271)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter$DorisSerializationConverter.serialize(TISDorisColumnConverter.java:154)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter$DorisSerializationConverter.serialize(TISDorisColumnConverter.java:145)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.lambda$wrapNullableExternalConverter$6e5a4709$1(TISDorisColumnConverter.java:135)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.lambda$wrapNullableExternalConverter$6e5a4709$1(TISDorisColumnConverter.java:135)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.toExternal(TISDorisColumnConverter.java:118)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.toExternal(TISDorisColumnConverter.java:43)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.createInsertVals(DorisLoadClient.java:198)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.processGenericRowData(DorisLoadClient.java:169)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.process(DorisLoadClient.java:155)
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.writeSingleRecordInternal(DorisHttpOutputFormat.java:89)
    ... 11 more
Caused by: java.lang.ClassCastException: class org.apache.flink.table.data.binary.BinaryStringData cannot be cast to class java.lang.Integer (org.apache.flink.table.data.binary.BinaryStringData is in unnamed module of loader 'app'; java.lang.Integer is in module java.base of loader 'bootstrap')
    at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$IntGetter.getObject(RowFieldGetterFactory.java:256)
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getVal(RowFieldGetterFactory.java:279)
    ... 22 more

这个表的id就是int类型的。

baisui1981 commented 1 month ago

问题

  1. 是否设置了transformer功能?
  2. 同步的源端和目标端是什么?
  3. 可否提供一下源端对应表的ddl,并附上一条 insert语句
shizenghua commented 1 month ago

1、没有设置transformer. 2、mysql8.0 -> doris 2.1.6 经测试,一百多张表, 只有这一个表有问题,其他的表都正常。把这个表去掉后实时同步都是正常的。但是这个表的全量同步也是正常的。

这是到处的创建表sql

CREATE TABLE `guest_ex` (
  `id` int NOT NULL COMMENT '客户扩展表',
  `gzip` varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '邮编',
  `gcode` varchar(10) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '区号',
  `gwork` int NOT NULL COMMENT '职业',
  `gbirthday` bigint NOT NULL COMMENT '生日',
  `gbackdrop` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '特殊背景',
  `glike` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '兴趣爱好',
  `gcollection` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '收藏喜好',
  `gidcard` varchar(20) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '身份证',
  `gincome` int NOT NULL DEFAULT '0' COMMENT '收入情况',
  `ghealthmanage` text CHARACTER SET utf8 COLLATE utf8_bin COMMENT '健康管理',
  `gcarline` text CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '乘车路线',
  `auto_id` int NOT NULL
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb3 COLLATE=utf8_bin;

这是自动生成的DataX脚本脚本

"querySql":[
 "SELECT `id`,`gzip`,`gcode`,`gwork`,`gbirthday`,`gbackdrop`,`glike`,`gcollection`,`gidcard`,`gincome`,`ghealthmanage`,`gcarline`,`auto_id` FROM `guest_ex` "
 ],
................................................
"column":[
  "id",
  "gzip",
  "gcode",
  "gwork",
  "gbirthday",
  "gbackdrop",
  "glike",
  "gcollection",
  "gidcard",
  "gincome",
  "ghealthmanage",
  "gcarline",
  "auto_id"
  ],

这是自动生成的Table DDL Script

CREATE TABLE `guest_ex`
(
    `auto_id`        INT NOT NULL,
    `id`             INT,
    `gzip`           VARCHAR(30),
    `gcode`          VARCHAR(30),
    `gwork`          INT,
    `gbirthday`      BIGINT,
    `gbackdrop`      STRING,
    `glike`          STRING,
    `gcollection`    STRING,
    `gidcard`        VARCHAR(60),
    `gincome`        INT,
    `ghealthmanage`  STRING,
    `gcarline`       STRING
)
 ENGINE=olap
UNIQUE KEY(`auto_id`)
DISTRIBUTED BY HASH(`auto_id`)
BUCKETS 10
PROPERTIES("replication_num" = "1"  )

这是解析出的mysql二进制日志内容。

### UPDATE `cti`.`guest_ex`
### WHERE
###   @1=2257020 /* INT meta=0 nullable=0 is_null=0 */
###   @2='' /* VARSTRING(30) meta=30 nullable=0 is_null=0 */
###   @3='' /* VARSTRING(30) meta=30 nullable=0 is_null=0 */
###   @4=0 /* INT meta=0 nullable=0 is_null=0 */
###   @5=0 /* LONGINT meta=0 nullable=0 is_null=0 */
###   @6='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @7='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @8='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @9='' /* VARSTRING(60) meta=60 nullable=0 is_null=0 */
###   @10=0 /* INT meta=0 nullable=0 is_null=0 */
###   @11=NULL /* BLOB/TEXT meta=2 nullable=1 is_null=1 */
###   @12='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @13=3743477 /* INT meta=0 nullable=0 is_null=0 */
### SET
###   @1=2257020 /* INT meta=0 nullable=0 is_null=0 */
###   @2='' /* VARSTRING(30) meta=30 nullable=0 is_null=0 */
###   @3='' /* VARSTRING(30) meta=30 nullable=0 is_null=0 */
###   @4=0 /* INT meta=0 nullable=0 is_null=0 */
###   @5=0 /* LONGINT meta=0 nullable=0 is_null=0 */
###   @6='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @7='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @8='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @9='' /* VARSTRING(60) meta=60 nullable=0 is_null=0 */
###   @10=0 /* INT meta=0 nullable=0 is_null=0 */
###   @11=NULL /* BLOB/TEXT meta=2 nullable=1 is_null=1 */
###   @12='' /* BLOB/TEXT meta=2 nullable=0 is_null=0 */
###   @13=3743477 /* INT meta=0 nullable=0 is_null=0 */

为什么会报id字段有问题?这id明明是int类型的。

shizenghua commented 1 month ago

单独给这一个表做实时同步, 一个样,报错。

2024-10-23 13:58:38,354 WARN  com.dtstack.chunjun.dirty.log.LogDirtyDataCollector          [] - 
====================Dirty Data=====================
DirtyDataEntry[jobId='a4e77624fb8f86252c00be03eb4c549d', jobName='guest_ex', operatorName='Sink: guest_ex', dirtyContent='{"rowKind":"UPDATE_AFTER","arity":13}', errorMessage='com.dtstack.chunjun.throwable.WriteRecordException: 
java.lang.RuntimeException: colIdx:1,colName:id
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.writeSingleRecordInternal(DorisHttpOutputFormat.java:91)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:483)
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.lambda$writeRecordInternal$0(DorisHttpOutputFormat.java:131)
    at java.base/java.util.ArrayList.forEach(Unknown Source)
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.writeRecordInternal(DorisHttpOutputFormat.java:131)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$1(BaseRichOutputFormat.java:460)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: colIdx:1,colName:id
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getVal(RowFieldGetterFactory.java:281)
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getFieldOrNull(RowFieldGetterFactory.java:271)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter$DorisSerializationConverter.serialize(TISDorisColumnConverter.java:154)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter$DorisSerializationConverter.serialize(TISDorisColumnConverter.java:145)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.lambda$wrapNullableExternalConverter$6e5a4709$1(TISDorisColumnConverter.java:135)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.lambda$wrapNullableExternalConverter$6e5a4709$1(TISDorisColumnConverter.java:135)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.toExternal(TISDorisColumnConverter.java:118)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.toExternal(TISDorisColumnConverter.java:43)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.createInsertVals(DorisLoadClient.java:198)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.processGenericRowData(DorisLoadClient.java:169)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.process(DorisLoadClient.java:155)
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.writeSingleRecordInternal(DorisHttpOutputFormat.java:89)
    ... 11 more
Caused by: java.lang.ClassCastException: class org.apache.flink.table.data.binary.BinaryStringData cannot be cast to class java.lang.Integer (org.apache.flink.table.data.binary.BinaryStringData is in unnamed module of loader 'app'; java.lang.Integer is in module java.base of loader 'bootstrap')
    at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$IntGetter.getObject(RowFieldGetterFactory.java:256)
    at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getVal(RowFieldGetterFactory.java:279)
    ... 22 more
', fieldName='null', createTime=2024-10-23 13:58:38.069]
baisui1981 commented 4 weeks ago

ok, 收到,我在本地试试

baisui1981 commented 4 weeks ago
CREATE TABLE `guest_ex`
(
    `auto_id`        INT NOT NULL, 
    `id`             INT,
    `gzip`           VARCHAR(30),
    `gcode`          VARCHAR(30),
    `gwork`          INT,
    `gbirthday`      BIGINT,
    `gbackdrop`      STRING,
    `glike`          STRING,
    `gcollection`    STRING,
    `gidcard`        VARCHAR(60),
    `gincome`        INT,
    `ghealthmanage`  STRING,
    `gcarline`       STRING
)
 ENGINE=olap
UNIQUE KEY(`auto_id`)
DISTRIBUTED BY HASH(`auto_id`)
BUCKETS 10
PROPERTIES("replication_num" = "1"  )

以上auto_id 在设置主键页面被设置成了主键了?

截屏2024-10-24 09 51 30

shizenghua commented 4 weeks ago

这里所有设置都是正确的。

现在问题已解决,解决方法: 在msyql中重新创建一个空表,把auto_id这个自增主键调到了第一个【不清楚是不是这个顺序也有关系】,将旧表的几百万条数据复制到这个新表,将原表删除,将新表重新改名。 这样就没报错, 实时同步都是正常的。

baisui1981 commented 4 weeks ago

在msyql中重新创建一个空表,把auto_id这个自增主键调到了第一个【不清楚是不是这个顺序也有关系】,将

ok,我再查一下,是否有更好的办法