apache / doris-flink-connector

Flink Connector for Apache Doris
https://doris.apache.org/
Apache License 2.0
292 stars 201 forks source link

[fix](cdc)fix excluding pattern not working #390

Closed vinlee19 closed 1 month ago

vinlee19 commented 1 month ago

Proposed changes

Issue Number: close #xxx

Problem Summary:

When including-tables is set to ".*" and excluding-tables is set to "tbl1|tbl2", the exclusion pattern may not work if tables like tbl1 and tbl2 do not have primary keys, resulting in the following error:

org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: MySQL Source -> Sink: Writer -> Sink: Committer' (operator cbc357ccb763df2852fee8c4fc7d55f2).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:472)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Chunk splitting has encountered exception
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.checkSplitterErrors(MySqlSnapshotSplitAssigner.java:574)
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getNext(MySqlSnapshotSplitAssigner.java:330)
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:123)
    at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:218)
    at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:109)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:557)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:284)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:458)
    ... 8 more
Caused by: java.lang.IllegalStateException: Error when splitting chunks for ssb_test.customer
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitTable(MySqlSnapshotSplitAssigner.java:296)
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitChunksForRemainingTables(MySqlSnapshotSplitAssigner.java:557)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: java.lang.RuntimeException: Fail to analyze table in chunk splitter.
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.analyzeTable(MySqlChunkSplitter.java:158)
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.splitChunks(MySqlChunkSplitter.java:119)
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitTable(MySqlSnapshotSplitAssigner.java:294)
    ... 6 more
Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.
    at com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getChunkKeyColumn(ChunkUtils.java:67)
    at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.analyzeTable(MySqlChunkSplitter.java:152)

Describe the overview of changes.

Checklist(Required)

  1. Does it affect the original behavior: (Yes/No/I Don't know)
  2. Has unit tests been added: (Yes/No/No Need)
  3. Has document been added or modified: (Yes/No/No Need)
  4. Does it need to update dependencies: (Yes/No)
  5. Are there any changes that cannot be rolled back: (Yes/No)

Further comments

If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...