airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.76k stars 4.04k forks source link

Destination MySQL - Unable to create or change a table without a primary key #40232

Closed Ampsyy closed 1 month ago

Ampsyy commented 3 months ago

Connector Name

destination-mysql

Connector Version

1.0.0

What step the error happened?

During the sync

Relevant information

Hi,

Installed AirByte today to allow us to replicate data from MSSQL to MySQL, have been able to setup both connectors successfully however when trying to perform a sync between them. I receive the following from the destination:

java.util.concurrent.CompletionException: java.lang.RuntimeException: java.sql.SQLException: Unable to create or change a table without a primary key, when the system variable 'sql_require_primary_key' is set. Add a primary key to the table or unset this variable to avoid this message. Note that tables without a primary key can cause performance problems in row-based replication, so please consult your DBA before changing this setting.

It seems to be: airbyte_internal._airbyte_destination_state that causes the issue.

Our MySQL server has 'sql_require_primary_key' set to ON which we cannot turn off due to other processes requiring it to be enabled.

Relevant log output

at io.airbyte.cdk.db.jdbc.JdbcDatabase.executeWithinTransaction$lambda$1(JdbcDatabase.kt:46) ~[airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.kt:30) ~[airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.db.jdbc.JdbcDatabase.executeWithinTransaction(JdbcDatabase.kt:43) ~[airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.executeWithinTransaction(JdbcDestinationHandler.kt:500) ~[airbyte-cdk-db-destinations-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.commitDestinationStates(JdbcDestinationHandler.kt:492) ~[airbyte-cdk-db-destinations-0.33.0.jar:?]
        at io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper.prepareSchemasAndRunMigrations(DefaultTyperDeduper.kt:145) ~[airbyte-cdk-typing-deduping-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.destination.jdbc.JdbcBufferedConsumerFactory.onStartFunction$lambda$2(JdbcBufferedConsumerFactory.kt:237) ~[airbyte-cdk-db-destinations-0.33.0.jar:?]
        at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.kt:12) [airbyte-cdk-dependencies-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.start(AsyncStreamConsumer.kt:113) [airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.start(SerializedAirbyteMessageConsumer.kt:60) [airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:408) [airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:403) [airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209) [airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116) [airbyte-cdk-core-0.33.0.jar:?]
        at io.airbyte.integrations.destination.mysql.MySQLDestination.main(MySQLDestination.java:191) [io.airbyte.airbyte-integrations.connectors-destination-mysql.jar:?]
Stack Trace: java.sql.SQLSyntaxErrorException: Table 'airbyte_internal._airbyte_destination_state' doesn't exist
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:764)
        at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:648)
        at com.zaxxer.hikari.pool.ProxyStatement.execute(ProxyStatement.java:94)
        at com.zaxxer.hikari.pool.HikariProxyStatement.execute(HikariProxyStatement.java)
        at io.airbyte.cdk.db.jdbc.JdbcDatabase.executeWithinTransaction$lambda$1(JdbcDatabase.kt:46)
        at io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.kt:30)
        at io.airbyte.cdk.db.jdbc.JdbcDatabase.executeWithinTransaction(JdbcDatabase.kt:43)
        at io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.executeWithinTransaction(JdbcDestinationHandler.kt:500)
        at io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.commitDestinationStates(JdbcDestinationHandler.kt:492)
        at io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper.prepareSchemasAndRunMigrations(DefaultTyperDeduper.kt:145)
        at io.airbyte.cdk.integrations.destination.jdbc.JdbcBufferedConsumerFactory.onStartFunction$lambda$2(JdbcBufferedConsumerFactory.kt:237)
        at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.kt:12)
        at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.start(AsyncStreamConsumer.kt:113)
        at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.start(SerializedAirbyteMessageConsumer.kt:60)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:408)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:403)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116)
        at io.airbyte.integrations.destination.mysql.MySQLDestination.main(MySQLDestination.java:191)

Contribute

marcosmarxm commented 3 months ago

@airbytehq/destinations can someone take a look on this? MySQL is a community connector but def this can be relevant for other destinations too.

evantahler commented 3 months ago

@Ampsyy can you please include the full sync log? Also, which source are your using?

Ampsyy commented 3 months ago

@evantahler Attached is the exported sync logs, I'm syncing from a MSSQL server (source-mssql).

To note I've also tried syncing the same source to another MySQL server (one which has 'sql_require_primary_key' set to OFF) and it works great.

default_workspace_job_1_attempt_5_txt.txt

edgao commented 3 months ago

aha, this is the _airbyte_destination_state table. I think you can just create that manually with a composite PK on the name+namespace columns:

create table airbyte_internal._airbyte_destination_state
  name varchar,
  namespace varchar,
  destination_state varchar,
  updated_at datetime,
  PRIMARY KEY (name, namespace)
Ampsyy commented 3 months ago

I did try to create the table manually on my destination MySQL which has primary keys disabled but added an Id column and made that the PK. When the sync ran it seemed to update the table and remove the Id column along with the PK.

I'll try creating and setting the name and namespace as the PK

evantahler commented 3 months ago

Yeah, we do mange the columns on that table, but having an extra index should be OK

Ampsyy commented 1 month ago

Thanks all! Resolved the issue running the following to create the table:

CREATE TABLE `_airbyte_destination_state` (
  `name` varchar(255) NOT NULL,
  `namespace` varchar(255) NOT NULL,
  `destination_state` varchar(255) DEFAULT NULL,
  `updated_at` datetime DEFAULT NULL,
  PRIMARY KEY (`name`,`namespace`)
);