airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.72k stars 4.03k forks source link

MySQL CDC throws DebeziumException #41948

Open jsvgjsvg opened 2 months ago

jsvgjsvg commented 2 months ago

Connector Name

source-mysql

Connector Version

3.5.0

What step the error happened?

During the sync

Relevant information

With AWS RDS MySQL 8 read replica and Airbyte 0.63.6 I get a warning and an error from Debezium stating schemas do not match. I have tried to downgrade to source-mysql 3.1.1., but it resulted with the same error during the sync. I have tried increase initial wait time without any luck. This happens in the initial sync and also with the subsequent syncs.

The MySQL is setup with row binlog format, full binlog raw images and retention period to 10 days. I thought to move using binlog based syncs as it was recommended, but got stuck with these exceptions when trying the sync on two sample tables.

Thank you for any help!

Relevant log output


> WARN pool-2-thread-1 i.d.c.m.MySqlConnectorTask(validateAndLoadSchemaHistory):320 Database schema history was not found but was expected
> INFO pool-2-thread-1 i.d.c.m.s.AbstractConnectorConnection(isBinlogPositionAvailable):352 GTID Set retained: '39ef5909-...:1-232'
> INFO pool-2-thread-1 i.d.c.m.s.AbstractConnectorConnection(isBinlogPositionAvailable):356 The current GTID set '39ef5909-...:1-232' does not contain the GTID set '39ef5909-...:1-232' required by the connector
> INFO pool-2-thread-1 i.d.c.m.s.AbstractConnectorConnection(isBinlogPositionAvailable):362 Server has already purged '39ef5909-...:1-163' GTIDs
> INFO pool-2-thread-1 i.d.c.m.s.AbstractConnectorConnection(isBinlogPositionAvailable):365 GTIDs known by the server but not processed yet '39ef5909-...-78921881707', for replication are available only '39ef5909-...-78921881707'
> INFO pool-2-thread-1 i.d.c.m.MySqlConnectorTask(validateAndLoadSchemaHistory):327 The db-history topic is missing but we are in SCHEMA_ONLY_RECOVERY snapshot mode. Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.
> INFO pool-2-thread-1 i.d.c.m.MySqlConnectorTask(start):120 Reconnecting after finishing schema recovery

...

> ERROR blc-company-main-host:3306 i.d.c.m.MySqlStreamingChangeEventSource(validateChangeEventWithTable):684 Error processing before of row in table_x because it's different column size with internal schema size 31, but before size 1, restart connector with schema recovery mode.
> ERROR blc-company-main-host:3306 i.d.c.m.MySqlStreamingChangeEventSource(logStreamingSourceState):1046 Error during binlog processing. Last offset stored = {transaction_id=null, ts_sec=1721048478, file=mysql-bin-changelog.001633, pos=25274891, gtids=39:1,6f4:1-155, server_id=194, event=2}, binlog reader near position = mysql-bin-changelog.001633/25274962
> ERROR blc-company-main-host:3306 i.d.p.ErrorHandler(setProducerThrowable):52 Producer failure io.debezium.DebeziumException: Error processing binlog event
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:301) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$30(MySqlStreamingChangeEventSource.java:879) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1263) ~[mysql-binlog-connector-java-0.29.0.jar:0.29.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1089) ~[mysql-binlog-connector-java-0.29.0.jar:0.29.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:648) ~[mysql-binlog-connector-java-0.29.0.jar:0.29.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:949) ~[mysql-binlog-connector-java-0.29.0.jar:0.29.0]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: io.debezium.DebeziumException: Error processing row in table_x, internal schema size 31, but row size 1 , restart connector with schema recovery mode.
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.validateChangeEventWithTable(MySqlStreamingChangeEventSource.java:686) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleUpdate$9(MySqlStreamingChangeEventSource.java:728) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:774) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdate(MySqlStreamingChangeEventSource.java:722) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$23(MySqlStreamingChangeEventSource.java:859) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:281) ~[debezium-connector-mysql-2.5.4.Final.jar:2.5.4.Final]
        ... 6 more
Stack Trace: io.debezium.DebeziumException: Error processing binlog event
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:301)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$30(MySqlStreamingChangeEventSource.java:879)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1263)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1089)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:648)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:949)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: io.debezium.DebeziumException: Error processing row in table_x, internal schema size 31, but row size 1 , restart connector with schema recovery mode.
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.validateChangeEventWithTable(MySqlStreamingChangeEventSource.java:686)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleUpdate$9(MySqlStreamingChangeEventSource.java:728)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:774)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdate(MySqlStreamingChangeEventSource.java:722)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$23(MySqlStreamingChangeEventSource.java:859)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:281)
        ... 6 more

Contribute

akashkulk commented 2 months ago

Did you reset the connection after moving to binlog based syncs? Unfortunately that might be the only option here

jsvgjsvg commented 2 months ago

Did you reset the connection after moving to binlog based syncs? Unfortunately that might be the only option here

I created completely new source with binlog to test it out :)

jsvgjsvg commented 2 months ago

@akashkulk I now tested local Airbyte against local MySQL (8.3.) with the same database schema, and everything worked.

I am wondering are there some AWS specific configuration required from Debezium that should be done?

jsvgjsvg commented 2 months ago

There should not be anything special about AWS. One clue I received was to investigate why the Debezium schema history topic goes missing and also check the Debezium database history topic configuration. Based on https://sylhare.github.io/2023/11/07/Debezium-configuration.html I figured setting retention.ms and retention.bytes to -1 might help?

Does the MySQL source connector with CDC use Kafka with the Debezium? If so, can the configuration with Kafka be changed? On the other hand, I found old messages from Slack that stated Airbyte uses embedded Debezium and Kafka is not being used.

evantahler commented 1 month ago

Some additional context

From the official doc, it states that when the connector restarts after either a crash or a graceful stop, it starts reading the binlog from a specific position, that is, from a specific point in time. The connector rebuilds the table structures that existed at this point in time by reading the database schema history Kafka topic and parsing all DDL statements up to the point in the binlog where the connector is starting [2].

In your case, since the connector could not rebuild the table schema from the database schema history Kafka topic as it was missing, it had to snapshot the current schema in order to restore the database schema history topic that was lost.

Database schema history was not found but was expected.

The db-history topic is missing but we are in SCHEMA_ONLY_RECOVERY snapshot mode. Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset. This means that the connector rebuilt the table schemas as at that point in time using the snapshot of the current table schemas which might not match the schema as at the point before the connector restarted in a case where schema changes have occurred since the last execution of the connector. This is the reason for the error as the schema being used by the connector is different from the schema for the binlog events as at when the change event occurred. Error processing before of row in table_x because it's different column size with internal schema size 31, but before size 1, restart connector with schema recovery mode. Error processing row in table_x, internal schema size 31, but row size 1, restart connector with schema recovery mode

If a schema change is applied to a table, records that are committed before the schema change have different structures than those that were committed after the change. When Debezium captures data from a table, it reads the schema history to ensure that it applies the correct schema to each event. If the schema is not present in the schema history topic, the connector is unable to capture the table, and an error results [3]. With all these above in mind, it has been suggested to investigate how the database schema history Kafka topic goes missing. All Debezium relational connectors except PostgreSQL require a database history topic in order to function. This topic must be configured with the correct retention policy based on the documentation [4]:

The retention policy is used to configure the lifetime of record of all the schema changes applicable to the source database, and more importantly for the tables which changes are being captured for the connector. A database's schema can change at any time and the connector needs to be able to identify what the table's schema was when it gets a change event. It can't rely on the current table's structure because it may be processing events that are relatively old, which could have occurred before the current table's schema was altered. When the connector restarts, the connector uses this topic to rebuild table structures in-memory so that when it begins processing changes from where it left off, it knows exactly what the table's schema was at that moment in time, even if that time point was minutes, hours, or even days prior [5]. Hence, please kindly investigate why the Debezium schema history topic goes missing and also check the Debezium database history topic configuration.

jsvgjsvg commented 1 month ago

@evantahler Thank you a lot for thorough answer!

Do you know how I could change Debezium settings in Airbyte? I did find for instance this file, but I wonder if there is another place I could adjust history topic configurations?

evantahler commented 1 month ago

We don't actually use kafka at all. Debezium was meant to integrate with kafaka, but we stream those records out of the source to our platform instead