apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.71k stars 1.94k forks source link

Flink-CDC 2.3.0 consumes data based on SPECIFIC_OFFSETS. If the table structure is changed after the starting offset, it will not be able to consume the data correctly. #1962

Closed wallkop closed 1 year ago

wallkop commented 1 year ago

Search before asking

Flink version

1.14.5

Flink CDC version

2.3.0

Database and its version

5.7.37

Minimal reproduce step

1.`SHOW MASTER STATUS`
get GTIDs: cacec830-00de-11ed-bddc-525400b586f9:1-358, Subsequently, we will try to consume the binlog of the tbl_test table by setting specificOffset with this GTID.

2.`INSERT INTO `test_split_1`.`tbl_test`(`id`) VALUES (2)`
Insert a record into the tbl_test table

3.`ALTER TABLE `test_split_1`.`tbl_test` ADD COLUMN `t` varchar(255) NOT NULL AFTER `v`;`
modify its structure by adding a new column

4.`INSERT INTO `test_split_1`.`tbl_test`(`id`) VALUES (3)`
insert another record into the tbl_test table

5.`StartupOptions.specificOffset("cacec830-00de-11ed-bddc-525400b586f9:1-358")`
Set specificOffset and try to consume the binlog from before the table structure change

6.CDC throws an error and cannot recover.

What did you expect to see?

The CDC program consumes the binlog normally from the specified GTIDs

What did you see instead?

The CDC program throws an error:

2023-03-06 11:30:04,454 2023-03-06 11:30:04,454 ERROR [io.debezium.pipeline.ErrorHandler] - Producer failure
com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
    at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
    at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
    at java.lang.Thread.run(Thread.java:750)
Caused by: io.debezium.DebeziumException: Error processing binlog event
    ... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
    ... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
    at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
    at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
    at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
    ... 12 more
2023-03-06 11:30:04,456 2023-03-06 11:30:04,456 INFO [io.debezium.connector.mysql.MySqlStreamingChangeEventSource] - Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.
2023-03-06 11:30:04,531 2023-03-06 11:30:04,531 INFO [io.debezium.connector.mysql.MySqlStreamingChangeEventSource] - Keepalive thread is running
2023-03-06 11:30:04,772 2023-03-06 11:30:04,772 ERROR [org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
    at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
    ... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
    at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
    ... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
    ... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
    ... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
    at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
    at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
    at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
    ... 12 more
2023-03-06 11:30:04,773 2023-03-06 11:30:04,773 INFO [com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader] - Close current debezium reader com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader
2023-03-06 11:30:04,784 2023-03-06 11:30:04,784 INFO [io.debezium.jdbc.JdbcConnection] - Connection gracefully closed
2023-03-06 11:30:04,785 2023-03-06 11:30:04,785 INFO [io.debezium.connector.mysql.MySqlStreamingChangeEventSource] - Stopped reading binlog after 0 events, no new offset was recorded
2023-03-06 11:30:04,786 2023-03-06 11:30:04,786 INFO [org.apache.flink.connector.base.source.reader.SourceReaderBase] - Closing Source Reader.
2023-03-06 11:30:04,786 2023-03-06 11:30:04,786 INFO [org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher] - Shutting down split fetcher 0
2023-03-06 11:30:04,837 2023-03-06 11:30:04,837 WARN [com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader] - Failed to close the binlog split reader in 30 seconds.
2023-03-06 11:30:04,837 2023-03-06 11:30:04,837 INFO [org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher] - Split fetcher 0 exited.
2023-03-06 11:30:04,844 2023-03-06 11:30:04,844 WARN [org.apache.flink.runtime.taskmanager.Task] - Source: mysql-cdc-source (1/1)#0 (e98fb9c665c2e2b107663f21b343ae50) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
    at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
    ... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
    at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
    ... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
    ... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
    ... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
    at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
    at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
    at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
    ... 12 more

2023-03-06 11:30:04,844 2023-03-06 11:30:04,844 INFO [org.apache.flink.runtime.taskmanager.Task] - Freeing task resources for Source: mysql-cdc-source (1/1)#0 (e98fb9c665c2e2b107663f21b343ae50).
2023-03-06 11:30:04,858 2023-03-06 11:30:04,858 INFO [org.apache.flink.runtime.taskexecutor.TaskExecutor] - Un-registering task and sending final execution state FAILED to JobManager for task Source: mysql-cdc-source (1/1)#0 e98fb9c665c2e2b107663f21b343ae50.
2023-03-06 11:30:04,866 2023-03-06 11:30:04,866 INFO [org.apache.flink.runtime.executiongraph.ExecutionGraph] - Source: mysql-cdc-source (1/1) (e98fb9c665c2e2b107663f21b343ae50) switched from RUNNING to FAILED on cdc74e39-140c-47bb-868b-7f58874b0c14 @ localhost (dataPort=-1).
java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
    at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
    ... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
    at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
    ... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
    ... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
    ... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
    at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
    at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
    at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
    at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
    ... 12 more

Anything else?

It is very common in actual business scenarios to encounter situations where the table structure has been changed after the starting binlog offset when recovering data from SPECIFIC_OFFSETS. If this issue is not resolved, the functionality of data recovery from SPECIFIC_OFFSETS will become less useful.

After reading the source code of CDC and Debezium, I found that when recovering data based on SPECIFIC_OFFSETS, the table structure is still obtained from the latest schema. To solve this problem, I believe that it is necessary to obtain a schema snapshot at the starting binlog timestamp. This can be achieved using the following approach:

1.First, obtain the latest schema. 2.Consume the binlog from the starting position, and if the row format of the binlog is inconsistent with the obtained schema, enter a historical schema recovery logic (the source code throws an exception). 3.Replay the binlog and only retrieve schema changed events to restore the schema to the historical snapshot corresponding to the starting binlog timestamp. 4.Based on the schema historical snapshot, start consuming from the starting binlog position again and capture schema changed events at any time to update the schema structure.

The biggest problem with this approach may be that it could take a long time to recover the historical schema snapshot. Perhaps there is a better solution available?

Are you willing to submit a PR?

ruanhang1993 commented 1 year ago

This is an expected behavior, which has been written in the mysql cdc connector doc.

If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.
ruanhang1993 commented 1 year ago

1724 is the PR about this part.

HoboLegion commented 1 year ago

flink 1.14.5
flinkcdc 2.3.0 可以稳定复现 ,基于位点重启,如果位点之后存在ddl语句,eg:新加一列,回出现上述问题。

`2023-03-14 11:19:41,676 WARN com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Failed to close the binlog split reader in 30 seconds. 2023-03-14 11:19:41,676 WARN org.apache.flink.runtime.taskmanager.Task - Source: MySQLSource -> Sink: Print to Std. Out (5/12)#1 (eaed28dabed1ff239d50be1f7fd732c7) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:156) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422) at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017) at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857) ... 1 more Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to TIMESTAMP at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84) ... 8 more Caused by: io.debezium.DebeziumException: Error processing binlog event ... 8 more Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678758780, file=, pos=378512, row=1, server_id=1, event=2} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleUpdate$6(MySqlStreamingChangeEventSource.java:817) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdate(MySqlStreamingChangeEventSource.java:809) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$18(MySqlStreamingChangeEventSource.java:989) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404) ... 7 more Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:100) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:51) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217) ... 12 more

Process finished with exit code -1 `

wallkop commented 1 year ago

This is an expected behavior, which has been written in the mysql cdc connector doc.

If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.

Thank you very much, @ruanhang1993. I understand your point, so I removed the bug label, but I did not close this issue. I believe this should be an enhancement. In the Flink CDC DingTalk community, multiple users have also reported this issue. LeonardBang suggested creating an issue to record and attempt to resolve it.

Hello, @leonardBang, is there a plan to solve this issue?

ltylty commented 1 year ago

is this fixed? same problem with me.

maikouliujian commented 2 months ago

is this fixed? same problem with me.