datavane / tis

Support agile DataOps Based on Flink, DataX and Flink-CDC, Chunjun with Web-UI
https://tis.pub
Apache License 2.0
1.04k stars 222 forks source link

mysql -> doris Flink 实时同步 出错 #162

Closed baisui1981 closed 2 years ago

baisui1981 commented 2 years ago

mysql -> doris flink 通过使用order2 库中

-- order2
update totalpayinfo set last_ver=last_ver+2, modify_time= unix_timestamp( now()) where totalpay_id = '991807005451d719015451d87157003d';

-- doris query
select last_ver,modify_time from totalpayinfo where totalpay_id = '991807005451d719015451d87157003d'\G;
====================Dirty Data=====================
DirtyDataEntry[jobId='1ba9717c0607605634dbd0dc0b46e2c1', jobName='mysql_doris', operatorName='Sink: totalpayinfo', dirtyContent='{"arity":37,"rowKind":"INSERT"}', errorMessage='com.dtstack.chunjun.throwable.WriteRecordException: 
java.lang.ClassCastException: java.lang.Byte cannot be cast to java.lang.Short
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.writeSingleRecordInternal(DorisHttpOutputFormat.java:91)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:482)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:287)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:96)
    at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:119)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:142)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.ClassCastException: java.lang.Byte cannot be cast to java.lang.Short
    at org.apache.flink.table.data.GenericRowData.getShort(GenericRowData.java:144)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter$1.tinyIntType(TISDorisColumnConverter.java:223)
    at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:111)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.lambda$getSerializationConverter$187873ae$1(TISDorisColumnConverter.java:163)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.lambda$wrapNullableExternalConverter$6e5a4709$1(TISDorisColumnConverter.java:149)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.lambda$wrapNullableExternalConverter$6e5a4709$1(TISDorisColumnConverter.java:149)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.toExternal(TISDorisColumnConverter.java:122)
    at com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TISDorisColumnConverter.toExternal(TISDorisColumnConverter.java:45)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.createInsertVals(DorisLoadClient.java:197)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.processGenericRowData(DorisLoadClient.java:170)
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.process(DorisLoadClient.java:156)
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.writeSingleRecordInternal(DorisHttpOutputFormat.java:89)
    ... 22 more
baisui1981 commented 2 years ago

已经支持 Flink SQL, 不会出错了