apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.79k stars 1.74k forks source link

[Bug] [connector-cdc] from mysql to starrocks : Generate Splits for table xxxx error #7044

Closed AristoDC closed 1 month ago

AristoDC commented 2 months ago

Search before asking

What happened

当表中存在唯一索引,但唯一索引中出现null值时 When a unique index exists in the table, but a null value appears in the unique index

org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils#skipReadAndSortSampleData |——Arrays.sort(resultsArray); #193

会出现NPE异常 This method will cause NPE exceptions

image

表格结构DDL -- auto-generated definition create table xx ( id bigint(11) not null primary key, out_biz_no varchar(128) null comment '业务单号', biz_type int null comment '业务数据类型',

constraint xx_unique
    unique (biz_type, out_biz_no)

) comment 'xx表' charset = utf8;

问题定位: identify the problem:

image 412行会根据唯一索引中可能为null值的biz_type赋值给splitColumn,导致最终按照biz_type进行拆分,引起后续的NPE

Row 412 will be assigned to splitColumn based on the biz_type that may be null in the unique index, resulting in eventual splitting based on biz_type, causing subsequent NPE.

SeaTunnel Version

2.3.4~2.3.5

SeaTunnel Config

{
    "env" : {
        "parallelism" : 1,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 10000
    },
    "source" : [
        {
            "base-url" : "jdbc:mysql://xxxxxxx/xxxxx?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true&allowMultiQueries=true&autoReconnect=true&failOverReadOnly=false&connectTimeout=3000&socketTimeout=6000&zeroDateTimeBehavior=convertToNull&nullCatalogMeansCurrent=true",
            "password" : "*********",
            "startup.mode" : "initial",
            "driver" : "com.mysql.jdbc.Driver",
            "username" : "xxx",
            "catalog" : {
                "factory" : "MySQL"
            },
            "table-names" : [
                "xx.xx"
            ],
            "database-names" : [
                "xx"
            ],
            "plugin_name" : "MySQL-CDC",
            "table-names-config" : [
                {
                    "table" : "xx.xx",
                    "primaryKeys" : [
                        "id"
                    ]
                }
            ]
        }
    ],
    "sink" : [
        {
            "base-url" : "jdbc:mysql://xxxxxx:9030/xx?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useInformationSchema=true&socketTimeout=60000",
            "enable_upsert_delete" : false,
            "password" : "*****",
            "database" : "xx",
            "starrocks.config" : {
                "format" : "JSON",
                "strip_outer_array" : true
            },
            "data_save_mode" : "APPEND_DATA",
            "nodeUrls" : [
                "xxxxxx:8030"
            ],
            "plugin_name" : "StarRocks",
            "table" : "xx",
            "username" : "xxx"
        }
    ]
}

Running Command

rest api:/hazelcast/rest/maps/submit-job

Error Exception

2024-06-21 16:41:25,066 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=856452939349229571, pipelineId=1, taskGroupId=1}] - [100.xx.xx.xx]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@1d217010
java.lang.RuntimeException: Generate Splits for table xx.xx error
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.generateSplits(AbstractJdbcSourceChunkSplitter.java:112) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:181) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner.getNext(HybridSplitAssigner.java:94) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:160) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.run(IncrementalSourceEnumerator.java:69) ~[?:?]
        at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.stateProcess(SourceSplitEnumeratorTask.java:307) ~[seatunnel-starter.jar:2.3.5-SNAPSHOT]
        at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.call(SourceSplitEnumeratorTask.java:134) ~[seatunnel-starter.jar:2.3.5-SNAPSHOT]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) ~[seatunnel-starter.jar:2.3.5-SNAPSHOT]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) ~[seatunnel-starter.jar:2.3.5-SNAPSHOT]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_272]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_272]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_272]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_272]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
Caused by: java.lang.NullPointerException
        at java.util.ComparableTimSort.countRunAndMakeAscending(ComparableTimSort.java:320) ~[?:1.8.0_272]
        at java.util.ComparableTimSort.sort(ComparableTimSort.java:202) ~[?:1.8.0_272]
        at java.util.Arrays.sort(Arrays.java:1246) ~[?:1.8.0_272]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils.skipReadAndSortSampleData(MySqlUtils.java:193) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.eumerator.MySqlChunkSplitter.sampleDataFromColumn(MySqlChunkSplitter.java:60) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.splitTableIntoChunks(AbstractJdbcSourceChunkSplitter.java:180) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter.generateSplits(AbstractJdbcSourceChunkSplitter.java:82) ~[?:?]
        ... 13 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

java 1.8.0_391

Screenshots

image

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 1 month ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.