airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.75k stars 3.79k forks source link

Source MySQL: Encountered change event for table whose schema isn't known to this connector #28150

Open alexnikitchuk opened 12 months ago

alexnikitchuk commented 12 months ago

Connector Name

source-mysql

Connector Version

2.0.19

What step the error happened?

During the sync

Revelant information

When we add a new table to the existing connection after a while it fails with Debezium exception

Encountered change event for table dynamodb-clone.BillingHourType whose schema isn't known to this connector

It seems like the connector does not propagate schema for the new table into the state: there is no ddl key for the new table. The workaround is resetting the whole connection whenever you add a new table.

Relevant log output

2023-07-10 19:10:13 [44msource[0m > ERROR i.d.c.m.MySqlStreamingChangeEventSource(informAboutUnknownTableIfRequired):692 Encountered change event 'Event{header=EventHeaderV4{timestamp=1689015633000, eventType=TABLE_MAP, serverId=780484762, headerLength=19, dataLength=555, nextPosition=32152271, flags=0}, data=TableMapEventData{tableId=278, database='dynamodb-clone', table='BillingHourType', columnTypes=-4, -4, 1, 15, -4, -4, 15, 15, 3, 15, 15, 15, -4, 15, 8, 8, 15, 8, 8, 15, 8, 8, 15, 15, 15, 15, 15, 15, 15, 15, 3, 3, 1, 8, 3, 18, 17, 1, columnMetadata=3, 3, 0, 144, 3, 3, 144, 400, 0, 144, 144, 144, 3, 1020, 0, 0, 1020, 0, 0, 1020, 0, 0, 1020, 1020, 1020, 1020, 1020, 1020, 1020, 1020, 0, 0, 0, 0, 0, 0, 6, 0, columnNullability={0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 37}, eventMetadata=TableMapEventMetadata{signedness={}, defaultCharset=255, charsetCollations=null, columnCharsets=null, columnNames=hourType, hourTypeAbbreviation, isActive, parentId, parentSortKey, hierarchy, id, entityType, version, tenantId, tenantCompanyId, partitionKey, sortKey, createdBy, createdDate, createdDateTime, deletedBy, deletedDate, deletedDateTime, lastUpdatedBy, lastUpdatedDate, lastUpdatedDateTime, lsi1, lsi2, lsi3, lsi4, lsi5, gsi1, gsi2, gsi3, rangeKey, _version, _deleted, _lastChangedAt, sortOrder, offlineUpdatedDateTime, lastModifiedUTC, isArchived, setStrValues=null, enumStrValues=null, geometryTypes=null, simplePrimaryKeys=6, primaryKeysWithPrefix=null, enumAndSetDefaultCharset=null, enumAndSetColumnCharsets=null,visibility=null}}}' at offset {transaction_id=null, ts_sec=1689015633, file=mysql-bin-changelog.009578, pos=32151597, server_id=780484762, event=1} for table dynamodb-clone.BillingHourType whose schema isn't known to this connector. One possible cause is an incomplete database schema history topic. Take a new snapshot in this case.
Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=32151697 --stop-position=32152271 --verbose mysql-bin-changelog.009578
2023-07-10 19:10:13 [44msource[0m > ERROR i.d.c.m.MySqlStreamingChangeEventSource(logStreamingSourceState):1143 Error during binlog processing. Last offset stored = {transaction_id=null, ts_sec=1689015633, file=mysql-bin-changelog.009578, pos=32151597, server_id=780484762, event=1}, binlog reader near position = mysql-bin-changelog.009578/32151697
2023-07-10 19:10:13 [44msource[0m > ERROR i.d.p.ErrorHandler(setProducerThrowable):57 Producer failure io.debezium.DebeziumException: Error processing binlog event
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:392) ~[debezium-connector-mysql-2.2.0.Final.jar:2.2.0.Final]
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$27(MySqlStreamingChangeEventSource.java:923) ~[debezium-connector-mysql-2.2.0.Final.jar:2.2.0.Final]
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246) ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072) ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: io.debezium.DebeziumException: Encountered change event for table dynamodb-clone.BillingHourType whose schema isn't known to this connector
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:697) ~[debezium-connector-mysql-2.2.0.Final.jar:2.2.0.Final]
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:738) ~[debezium-connector-mysql-2.2.0.Final.jar:2.2.0.Final]
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdateTableMetadata(MySqlStreamingChangeEventSource.java:653) ~[debezium-connector-mysql-2.2.0.Final.jar:2.2.0.Final]
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$14(MySqlStreamingChangeEventSource.java:893) ~[debezium-connector-mysql-2.2.0.Final.jar:2.2.0.Final]
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:372) ~[debezium-connector-mysql-2.2.0.Final.jar:2.2.0.Final]
    ... 6 more

Stack Trace: io.debezium.DebeziumException: Error processing binlog event
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:392)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$27(MySqlStreamingChangeEventSource.java:923)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.debezium.DebeziumException: Encountered change event for table dynamodb-clone.BillingHourType whose schema isn't known to this connector
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:697)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:738)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdateTableMetadata(MySqlStreamingChangeEventSource.java:653)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$14(MySqlStreamingChangeEventSource.java:893)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:372)
    ... 6 more

Contribute

ifun248 commented 11 months ago

I have encountered problem recently.

Caused by: java.lang.RuntimeException: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher.close(DebeziumRecordPublisher.java:106)
    at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:15)
    at io.airbyte.integrations.debezium.internals.DebeziumShutdownProcedure.initiateShutdownProcedure(DebeziumShutdownProcedure.java:93)
    ... 34 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:392)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$27(MySqlStreamingChangeEventSource.java:923)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.debezium.DebeziumException: Error processing binlog event
    ... 7 more
Caused by: io.debezium.DebeziumException: Encountered change event for table db.tb_keyword whose schema isn't known to this connector
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:697)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:738)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdateTableMetadata(MySqlStreamingChangeEventSource.java:653)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$14(MySqlStreamingChangeEventSource.java:893)
    at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:372)
    ... 6 more
,retryable=<null>,timestamp=1691589283467,additionalProperties={}], io.airbyte.config.FailureReason@5082207[failureOrigin=source,failureType=<null>,internalMessage=Source didn't exit properly - check the logs!,externalMessage=Something went wrong within the source connector,metadata=io.airbyte.config.Metadata@1cfba6bc[additionalProperties={attemptNumber=2, jobId=14084, connector_command=read}],stacktrace=io.airbyte.workers.internal.exception.SourceException: Source didn't exit properly - check the logs!
    at io.airbyte.workers.general.DefaultReplicationWorker.lambda$readFromSrcAndWriteToDstRunnable$5(DefaultReplicationWorker.java:303)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
    at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:145)
    at io.airbyte.workers.general.DefaultReplicationWorker.lambda$readFromSrcAndWriteToDstRunnable$5(DefaultReplicationWorker.java:301)
    ... 4 more
subodh1810 commented 10 months ago

@ifun248 @alexnikitchuk could you share the steps to reproduce this or what exactly were you doing while you got this error?

alexnikitchuk commented 10 months ago

When we add a new table to the existing connection after a while it fails with Debezium exception

This error appears when we add a new table to the existing connection.

subodh1810 commented 10 months ago

@alexnikitchuk does this happen every time you add a new table to an existing connection or was it a one time thing? Was the table newly created or an existing table?

alexnikitchuk commented 10 months ago

@alexnikitchuk does this happen every time you add a new table to an existing connection or was it a one time thing? Was the table newly created or an existing table?

Every time. The error occurs not immediately but after some time. Not sure what triggers it. It is an existing table

subodh1810 commented 10 months ago

I see! And when you add the table, does the Airbyte user that you use to sync have access to these tables from the start or do you first grant access to the table, refresh the Airbtye UI and then select the new table? Basically walk me through the entire process of adding a new table to an already running sync. I am trying to repro this

alexnikitchuk commented 10 months ago

gotcha, so we limit the tables Airbyte user has access to to workaround another issue https://github.com/airbytehq/airbyte/issues/24650 And when we need to sync another table we grant select on it to the user, click refresh on connection page on UI and select the new table

subodh1810 commented 10 months ago

Thank you @alexnikitchuk . I have been able to reproduce this error. The steps I followed :

  1. Create a new airbyte user and grant it SELECT access only on 1 table.
  2. Run a sync with this airbyte user with that 1 table selected in UI
  3. Change the permission of the airbyte user and grant it access to another table.
  4. Refresh the Airbyte UI catalog page and select the new table.
  5. Run a sync
  6. Insert a new record in the database in the newly selected table
  7. Run a sync -> This sync will generate the error.

The error comes while trying to parse the binlog for the newly added table.

ifun248 commented 9 months ago

@alexnikitchuk does this happen every time you add a new table to an existing connection or was it a one time thing? Was the table newly created or an existing table?

@subodh1810 For me, this happens sometimes when add a new table to an existing connection, which may come from Debezium. Ref: https://stackoverflow.com/questions/75518352/debezium-mysql-connector-error-encountered-change-event-for-table-whose-schema

akashkulk commented 8 months ago

@alexnikitchuk the workaround you needed for https://github.com/airbytehq/airbyte/issues/24650 should no longer be needed, since we are compressing schema history. With that change, can you remove the limitation of tables the Airbyte user has access to mentioned above.

jdarrahSNT commented 5 months ago

Hello,

I just wanted to comment that we have seen this error as well but with a slightly different setup. We have a user with SELECT grants to the entire database we are syncing. We sometimes create a new table but don't populate it immediately with data. Several days later when we insert the first rows, we get the same error (note I replaced the database and table with X:

"trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:392)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$27(MySqlStreamingChangeEventSource.java:923)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.debezium.DebeziumException: Error processing binlog event
        ... 7 more
Caused by: io.debezium.DebeziumException: Encountered change event for table XXX.XXXX whose schema isn't known to this connector
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:697)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired(MySqlStreamingChangeEventSource.java:738)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdateTableMetadata(MySqlStreamingChangeEventSource.java:653)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$14(MySqlStreamingChangeEventSource.java:893)
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:372)
        ... 6 more
"