StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
195 stars 156 forks source link

源表为中文名,数据同步异常 #211

Open byd-android-2017 opened 1 year ago

byd-android-2017 commented 1 year ago

只要表名是纯英文,其它保持不变,则数据同步一切正常,如果将表名改为中文名,作业能正常启动,但在sink端check point 启动事务时因抛出异常导致数据不能同步。

java.io.IOException: Could not perform checkpoint 16 for operator SourceConversion(table=[Unregistered_DataStream_91], fields=[姓名, 总分数]) -> NotNullEnforcer(fields=[姓名, 总分数]) -> Sink: Sink(table=[default_catalog.default_database.学生分数_9], fields=[姓名, 总分数]) (1/1)#12.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1271)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
    at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
    at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:495)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750)
    Suppressed: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction start failed, db : newmis
        at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.AssertNotException(StarRocksSinkManagerV2.java:368)
        at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.flush(StarRocksSinkManagerV2.java:300)
        at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:184)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1032)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1018)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:925)
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
        ... 3 more
    Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction start failed, db : newmis
        at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:120)
        at com.starrocks.connector.flink.manager.TransactionTableRegion.streamLoad(TransactionTableRegion.java:328)
        at com.starrocks.connector.flink.manager.TransactionTableRegion.flush(TransactionTableRegion.java:233)
        at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.lambda$init$0(StarRocksSinkManagerV2.java:130)
        ... 1 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 16 for operator SourceConversion(table=[Unregistered_DataStream_91], fields=[姓名, 总分数]) -> NotNullEnforcer(fields=[姓名, 总分数]) -> Sink: Sink(table=[default_catalog.default_database.学生分数_9], fields=[姓名, 总分数]) (1/1)#12. Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1326)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1312)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1255)
    ... 22 more
Caused by: java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction start failed, db : newmis
    at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.AssertNotException(StarRocksSinkManagerV2.java:368)
    at com.starrocks.connector.flink.manager.StarRocksSinkManagerV2.flush(StarRocksSinkManagerV2.java:300)
    at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.snapshotState(StarRocksDynamicSinkFunctionV2.java:197)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)
    ... 33 more
Caused by: [CIRCULAR REFERENCE: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction start failed, db : newmis]

MySQL源表

select 
  @@version,
  @@version_compile_os, 
  @@version_compile_machine, 
  @@version_compile_zlib
@@version @@version_compile_os @@version_compile_machine @@version_compile_zlib
8.0.28 Linux x86_64 1.2.11
CREATE TABLE newmis2.学生分数 (
  姓名 VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  总分数 BIGINT NOT NULL,
  PRIMARY KEY (姓名)
)
ENGINE = INNODB,
AVG_ROW_LENGTH = 8192,
CHARACTER SET utf8mb4,
COLLATE utf8mb4_0900_ai_ci;

insert into newmis2.学生分数(姓名, 总分数) values
('李小平', 100),
('李思佳', 120),
('贺双元', 150);

StarRocks目标表

SHOW VARIABLES like 'version%'
version version_comment
5.1.0 StarRocks version 2.5.3
CREATE TABLE `学生分数` (
  `姓名` varchar(30) NOT NULL COMMENT "",
  `总分数` bigint(20) NOT NULL COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`姓名`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`姓名`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
);

ETL作业

扩展包

EXECUTE CDCSOURCE jobname WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '10.1.*.****',
  'port' = '4417',
  'username' = 'root',
  'password' = '*********!',
  'checkpoint' = '3000',
  'source.server-time-zone' = 'Asia/Shanghai',
  'source.useUnicode' = 'yes',
  'source.characterEncoding' = 'UTF-8',
  'scan.startup.mode' = 'initial',
  'debezium.datetime.format.timestamp.zone' = 'UTC+8',
  'debezium.database.dbname' = 'newmis',
  'parallelism' = '1',
  'table-name' = 'newmis2\.学生分数',
  'sink.connector' = 'starrocks',
  'sink.jdbc-url' = 'jdbc:mysql://10.1.x.xx:9030,10.1.x.xx:9030',
  'sink.load-url' = '10.1.x.xx:8030;10.1.x.xx:8030',
  'sink.username' = 'root',
  'sink.password' = '',
  'sink.sink.db' = 'newmis',
  'sink.database-name' = 'newmis',
  'sink.table-name' = '${tableName}',
  'sink.sink.properties.format' = 'json',
  'sink.sink.properties.strip_outer_array' = 'true',
  'sink.sink.properties.timeout' = '60000',
  'sink.sink.label-prefix' = 'mis-',
  'sink.sink.max-retries' = '10',
  'sink.sink.semantic' = 'exactly-once',
  'sink.sink.buffer-flush.interval-ms' = '15000',
  'sink.sink.buffer-flush.max-bytes' = '74002019',
  'sink.sink.connect.timeout-ms' = '2000',
  'sink.sink.parallelism' = '1'
);