apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.7k stars 1.94k forks source link

[tidb] decode fails cause task failed and inconsistent data synchronization #1154

Closed coralzu closed 8 months ago

coralzu commented 2 years ago

Describe the bug(Please use English) When using tidb connector to synchronize data from tidb to elasticsearch, decoding exceptions are found, cause task failure and inconsistent data. Even if you restart the task with checkpoint, the data is still inconsistent.

Environment :

To Reproduce

  1. Create two test tables in TIDB
    
    CREATE TABLE `xw_left` (
    `left_id` int(11) NOT NULL,
    `left_int` int(11) DEFAULT NULL,
    `left_double` double DEFAULT NULL,
    `left_datetime` datetime DEFAULT NULL,
    PRIMARY KEY (`left_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE xw_right ( right_id int(11) NOT NULL, right_name varchar(255) DEFAULT NULL, right_int int(11) DEFAULT NULL, PRIMARY KEY (right_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

2. Write flinksql and start the synchronization task

CREATE TABLE xw_left ( left_id int primary key, left_int int, left_double double, left_datetime timestamp ) WITH ( 'connector' = 'tidb-cdc', 'tikv.grpc.timeout_in_ms' = '20000', 'pd-addresses' = 'locahost:2379', 'database-name' = 'test', 'table-name' = 'xw_left' );

CREATE TABLE xw_right ( right_id int primary key, right_name string, right_int int ) WITH ( 'connector' = 'tidb-cdc', 'tikv.grpc.timeout_in_ms' = '20000', 'pd-addresses' = 'locahost:2379', 'database-name' = 'test', 'table-name' = 'xw_right' );

CREATE TABLE es_ticdc ( left_id int , left_int int, left_double double, left_datetime timestamp, right_id int, right_name string, right_int int, PRIMARY KEY (left_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://xxx.xxx.xxx.xxx:9200', 'index' = 'xw_ticdc_test', 'username' = 'elastic', 'password' = 'xxxxxxx' );

insert into es_ticdc select * from xw_left left join xw_right on xw_left.left_int = xw_right.right_int

3. Write the test data with the random program
I use JDBC to write data in the random program and simulate insert update delete in one transaction.
the random  program generate sql like this :

insert into xw_left(left_double,left_datetime,left_int,left_id) values (4186.486028,'2022-05-07 10:24:10',9022,637); delete from xw_right where right_id = 27; update xw_left set left_double = 9461.560329,left_datetime = '2022-05-07 10:24:10',left_int = 8118 where left_id = 502;

4. Flink receives error and causes the task to stop

[Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0] INFO org.tikv.cdc.CDCClient - remove regions: [908001] [Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0] INFO org.tikv.cdc.RegionCDCClient - close (region: 908001) [Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0] INFO org.tikv.cdc.RegionCDCClient - terminated (region: 908001) [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033118172577793, regionId: 908001 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1651890265454 for job 1c4b252107a9b0e0f9968052543d49e8. [Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction - snapshotState checkpoint: 2 at resolvedTs: 433033118172577793 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033118434721793, regionId: 908001 [Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0 (f3ad5eb5692d8098ffdc22588ef81932) switched from RUNNING to FAILED with failure cause: org.tikv.common.exception.CodecException: Decode fails: value length is zero at org.tikv.common.codec.TableCodec.decodeObjects(TableCodec.java:47) at com.ververica.cdc.connectors.tidb.table.RowDataTiKVChangeEventDeserializationSchema.deserialize(RowDataTiKVChangeEventDeserializationSchema.java:71) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.flushRows(TiKVRichParallelSourceFunction.java:219) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.readChangeEvents(TiKVRichParallelSourceFunction.java:207) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.run(TiKVRichParallelSourceFunction.java:136) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

[Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0] INFO org.tikv.cdc.CDCClient - remove regions: [] [grpc-default-executor-1] ERROR org.tikv.cdc.RegionCDCClient - region CDC error: region: 908001, error: {} [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033118697127937, regionId: 908001 org.tikv.shade.io.grpc.StatusRuntimeException: UNAVAILABLE: keepalive watchdog timeout at org.tikv.shade.io.grpc.Status.asRuntimeException(Status.java:535) at org.tikv.shade.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:506) at org.tikv.shade.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:577) at org.tikv.shade.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) at org.tikv.shade.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:786) at org.tikv.shade.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:764) at org.tikv.shade.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.tikv.shade.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) [Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0 (f3ad5eb5692d8098ffdc22588ef81932). [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033118959009793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033119221153794, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033119483297793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033119745441793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033120007585793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033120269729793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033120531873794, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033120795852801, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033121056161793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033121318305793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033121580449793, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 433033121842593794, regionId: 908001 [grpc-default-executor-1] ERROR org.tikv.cdc.RegionCDCClient - region CDC error: region: 908001, error: {} org.tikv.shade.io.grpc.StatusRuntimeException: UNAVAILABLE: keepalive watchdog timeout at org.tikv.shade.io.grpc.Status.asRuntimeException(Status.java:535) at org.tikv.shade.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:506) at org.tikv.shade.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:577) at org.tikv.shade.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) at org.tikv.shade.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:786) at org.tikv.shade.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:764) at org.tikv.shade.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.tikv.shade.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - handle error: org.tikv.shade.io.grpc.StatusRuntimeException: UNAVAILABLE: keepalive watchdog timeout, regionId: 908001 [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - remove regions: [908001] [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.RegionCDCClient - close (region: 908001) [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.RegionCDCClient - terminated (region: 908001) [Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - remove regions: [] [Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0 (07e35d834a6059ea430b4347c7851936) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException at org.tikv.cdc.CDCClient.getMinResolvedTs(CDCClient.java:100) at org.tikv.cdc.CDCClient.handleErrorEvent(CDCClient.java:230) at org.tikv.cdc.CDCClient.get(CDCClient.java:92) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.readChangeEvents(TiKVRichParallelSourceFunction.java:199) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.run(TiKVRichParallelSourceFunction.java:136) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

[Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.tikv.cdc.CDCClient - remove regions: [] [Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0 (07e35d834a6059ea430b4347c7851936). [flink-akka.actor.default-dispatcher-14] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1)#0 f3ad5eb5692d8098ffdc22588ef81932. [flink-akka.actor.default-dispatcher-14] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1)#0 07e35d834a6059ea430b4347c7851936. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, xw_right]], fields=[right_id, right_name, right_int]) (1/1) (f3ad5eb5692d8098ffdc22588ef81932) switched from RUNNING to FAILED on ce68cf29-615b-43ec-8ac9-635c3413c6e4 @ 127.0.0.1 (dataPort=-1). org.tikv.common.exception.CodecException: Decode fails: value length is zero at org.tikv.common.codec.TableCodec.decodeObjects(TableCodec.java:47) at com.ververica.cdc.connectors.tidb.table.RowDataTiKVChangeEventDeserializationSchema.deserialize(RowDataTiKVChangeEventDeserializationSchema.java:71) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.flushRows(TiKVRichParallelSourceFunction.java:219) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.readChangeEvents(TiKVRichParallelSourceFunction.java:207) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.run(TiKVRichParallelSourceFunction.java:136) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task feca28aff5a3958840bee985ee7de4d3_0. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 5 tasks should be restarted to recover the failed task feca28aff5a3958840bee985ee7de4d3_0. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job xw_test (1c4b252107a9b0e0f9968052543d49e8) switched from state RUNNING to RESTARTING. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1) (07e35d834a6059ea430b4347c7851936) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1) (aa34b3036337e8c17da7a3da3720b15f) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - ChangelogNormalize(key=[left_id]) (1/1) (98a70db2d3009b92becade7bc218f818) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0 (aa34b3036337e8c17da7a3da3720b15f). [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0 (aa34b3036337e8c17da7a3da3720b15f) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0 (aa34b3036337e8c17da7a3da3720b15f). [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - ChangelogNormalize(key=[right_id]) (1/1) (ff7e7e6fe5edbbb25bc69f51fe2ea56f) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task ChangelogNormalize(key=[left_id]) (1/1)#0 (98a70db2d3009b92becade7bc218f818). [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - ChangelogNormalize(key=[left_id]) (1/1)#0 (98a70db2d3009b92becade7bc218f818) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code ChangelogNormalize(key=[left_id]) (1/1)#0 (98a70db2d3009b92becade7bc218f818). [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task ChangelogNormalize(key=[right_id]) (1/1)#0 (ff7e7e6fe5edbbb25bc69f51fe2ea56f). [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - ChangelogNormalize(key=[right_id]) (1/1)#0 (ff7e7e6fe5edbbb25bc69f51fe2ea56f) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-13] INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code ChangelogNormalize(key=[right_id]) (1/1)#0 (ff7e7e6fe5edbbb25bc69f51fe2ea56f). [ChangelogNormalize(key=[left_id]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - ChangelogNormalize(key=[left_id]) (1/1)#0 (98a70db2d3009b92becade7bc218f818) switched from CANCELING to CANCELED. [ChangelogNormalize(key=[left_id]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for ChangelogNormalize(key=[left_id]) (1/1)#0 (98a70db2d3009b92becade7bc218f818). [ChangelogNormalize(key=[right_id]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - ChangelogNormalize(key=[right_id]) (1/1)#0 (ff7e7e6fe5edbbb25bc69f51fe2ea56f) switched from CANCELING to CANCELED. [ChangelogNormalize(key=[right_id]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for ChangelogNormalize(key=[right_id]) (1/1)#0 (ff7e7e6fe5edbbb25bc69f51fe2ea56f). [flink-akka.actor.default-dispatcher-12] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task ChangelogNormalize(key=[left_id]) (1/1)#0 98a70db2d3009b92becade7bc218f818. [flink-akka.actor.default-dispatcher-12] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task ChangelogNormalize(key=[right_id]) (1/1)#0 ff7e7e6fe5edbbb25bc69f51fe2ea56f. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, xw_left]], fields=[left_id, left_int, left_double, left_datetime]) (1/1) (07e35d834a6059ea430b4347c7851936) switched from CANCELING to CANCELED. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 07e35d834a6059ea430b4347c7851936. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 07e35d834a6059ea430b4347c7851936. [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - ChangelogNormalize(key=[left_id]) (1/1) (98a70db2d3009b92becade7bc218f818) switched from CANCELING to CANCELED. [jobmanager-future-thread-6] WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 2 from task 98a70db2d3009b92becade7bc218f818 of job 1c4b252107a9b0e0f9968052543d49e8 at ce68cf29-615b-43ec-8ac9-635c3413c6e4 @ 127.0.0.1 (dataPort=-1). [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - ChangelogNormalize(key=[right_id]) (1/1) (ff7e7e6fe5edbbb25bc69f51fe2ea56f) switched from CANCELING to CANCELED. [Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0 (aa34b3036337e8c17da7a3da3720b15f) switched from CANCELING to CANCELED. [Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0 (aa34b3036337e8c17da7a3da3720b15f). [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1)#0 aa34b3036337e8c17da7a3da3720b15f. [flink-akka.actor.default-dispatcher-12] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Join(joinType=[LeftOuterJoin], where=[(left_int = right_int)], select=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey]) -> NotNullEnforcer(fields=[left_id]) -> Sink: Sink(table=[default_catalog.default_database.es_ticdc], fields=[left_id, left_int, left_double, left_datetime, right_id, right_name, right_int]) (1/1) (aa34b3036337e8c17da7a3da3720b15f) switched from CANCELING to CANCELED. [pool-12-thread-1] WARN org.tikv.common.region.StoreHealthyChecker - store [172.17.0.90:20161] is not reachable [flink-akka.actor.default-dispatcher-10] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 1c4b252107a9b0e0f9968052543d49e8 [pool-11-thread-1] WARN org.tikv.common.region.StoreHealthyChecker - store [172.17.0.90:20161] is not reachable [flink-akka.actor.default-dispatcher-28] INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool - Releasing slot [b5ddb651b71c1109c55fcb3a6de70769]. [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, allocationId: b5ddb651b71c1109c55fcb3a6de70769, jobId: 1c4b252107a9b0e0f9968052543d49e8). [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Remove job 1c4b252107a9b0e0f9968052543d49e8 from job leader monitoring. [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 1c4b252107a9b0e0f9968052543d49e8. Disconnected from the target VM, address: '127.0.0.1:0', transport: 'socket' [PermanentBlobCache shutdown hook] INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache [TaskExecutorLocalStateStoresManager shutdown hook] INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager. [TransientBlobCache shutdown hook] INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache [FileCache shutdown hook] INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory C:\Users\Administrator\AppData\Local\Temp\flink-dist-cache-e472f263-c83b-4df6-8476-d60fac080c69 [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:59240 [FileChannelManagerImpl-netty-shuffle shutdown hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager removed spill file directory C:\Users\Administrator\AppData\Local\Temp\flink-netty-shuffle-1b975fc9-0c41-4c02-9578-e9b106b78dca [FileChannelManagerImpl-io shutdown hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager removed spill file directory C:\Users\Administrator\AppData\Local\Temp\flink-io-35b95e7f-cce5-4308-8852-56422d1be6dc



**Additional Description**
This problem seems to be caused by the empty data received by the decoder.
![QQ图片20220507112138](https://user-images.githubusercontent.com/45164615/167236313-eb304aff-c421-4a22-903a-9639e1a800f5.png)
I tried to adjust tikv.batch_ get_the_concurrency parameter in cdc connectors set to 1. I also tried to adjust the parallelism in flink environment, which is set to 1. The problem still exists.
PatrickRen commented 8 months ago

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!