confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

Mysql Sink : unknown table X in information_schema Exception #573

Open rdinkel opened 5 years ago

rdinkel commented 5 years ago

Hello, the connector is able to read from the mysql db log and write it to kafka. I can see it via KSQL Server. But the sink is not working. If the table "thermostat_data" is not there it will be created in the demodb schema.

{
  "name": "mysql-sink-thermostat-data",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "thermostat_data",
    "connection.url": "jdbc:mysql://127.0.0.1:3306/demodb?user=connector&password=XXX",
    "auto.create": "true",
    "insert.mode": "insert",
    "key.converter.schema.registry.url": "http://schema-registry-service:8081",
    "value.converter.schema.registry.url": "http://schema-registry-service:8081",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
  }
}

After that the connector crashes with

2019-01-17 10:32:06,074 ERROR  ||  WorkerSinkTask{id=mysql-sink-thermostat-data-0} RetriableException from SinkTask:   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Unknown table 'thermostat_data' in information_schema

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:92)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Unknown table 'thermostat_data' in information_schema

in use: mysql db server 5.7 mysql connector driver 8.0.11, 8.0.13 (tried both) kafka-connect-jdbc-5.1.0 docker container: debezium/connect:0.9.0.Beta2

Please help

rdinkel commented 5 years ago

Downgrading the mysql driver to 5.1.17 fixed the issue for me! :) the 6er version of the driver causes the same issue. I hope that helps!

eyesmoker commented 5 years ago

I have the same issue. if I downgrade the mysql connector java from 8.x to 5.17, sink connector works but mysql source connector fails. I needs this urgent. Any help?

Caused by: java.lang.NoClassDefFoundError: com/mysql/cj/jdbc/Driver at io.debezium.connector.mysql.MySqlConnectorConfig.<clinit>(MySqlConnectorConfig.java:695) at io.debezium.connector.mysql.MySqlConnector.validate(MySqlConnector.java:79) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:282) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:538) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:535) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:271) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

heaje commented 5 years ago

We are seeing the same issue with the Connect JDBC version 5.0.1 and MySQL JDBC driver 8.0.15. Downgrading the JDBC driver to 5.1.25 fixed the issue.

gdtroszak commented 5 years ago

We're indeed seeing the same issue, but downgrading our MySQL driver is not an option. Is there a timeline for a fix?

rmoff commented 5 years ago

Server version: 5.7.25 MySQL Community Server (GPL) Kafka version: 2.2.0-cp2 MySQL driver: mysql-connector-java-8.0.13

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '
  { "name": "jdbc_sink_01",
    "config": {
     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "tasks.max": "1",
     "topics": "mysql-00-customers",
     "table.name.format": "sink_${topic}",
                    "connection.url": "jdbc:mysql://mysql:3306/demo",
    "connection.user": "connect_user",
    "connection.password": "asgard",
     "auto.create": "true",
     "auto.evolve":"true",
     "pk.mode":"none"
    }
  }'

🔴 Fails:

[2019-04-08 09:34:12,120] INFO Checking MySql dialect for existence of table "sink_mysql-00-customers" (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-04-08 09:34:12,125] INFO Using MySql dialect table "sink_mysql-00-customers" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-04-08 09:34:12,129] WARN Write of 5 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
java.sql.SQLSyntaxErrorException: Unknown table 'sink_mysql-00-customers' in information_schema
   at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
   at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
   at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
   at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)
   at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2965)
   at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2953)
   at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)
   at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:3006)
   at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:717)
   at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:554)
   at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeTable(GenericDatabaseDialect.java:753)
   at io.confluent.connect.jdbc.util.TableDefinitions.get(TableDefinitions.java:62)
   at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:58)
   at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)
   at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
[2019-04-08 09:34:12,129] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
[2019-04-08 09:34:12,131] INFO Initializing writer using SQL dialect: MySqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask)
[2019-04-08 09:34:12,132] ERROR WorkerSinkTask{id=jdbc_sink_01-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Unknown table 'sink_mysql-00-customers' in information_schema

   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Unknown table 'sink_mysql-00-customers' in information_schema

   ... 12 more

✅ Same config but with MySQL driver: mysql-connector-java-5.1.47 - works fine, records written to MySQL.


Looking at the DEBUG for the connector in both configurations I see:

DEBUG Querying MySql dialect column metadata for catalog:null schema:demo table:sink_mysql-00-customers (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)

With the v5 driver we then get:

INFO Setting metadata for table "demo"."sink_mysql-00-customers" to Table{name='"demo"."sink_mysql-00-customers"', columns=[Column{'gender', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'id', isPrimaryKey=false, allowsNull=false, sqlType=INT}, Column{'email', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'UPDATE_TS', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}, Column{'comments', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'first_name', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'last_name', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}]} (io.confluent.connect.jdbc.util.TableDefinitions)

Whilst with the v8 drivers the failure is logged:

[2019-04-08 09:55:21,303] WARN Write of 5 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
java.sql.SQLSyntaxErrorException: Unknown table 'sink_mysql-00-customers' in information_schema
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)
        at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2965)
        at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2953)
        at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)
        at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:3006)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:717)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:554)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeTable(GenericDatabaseDialect.java:753)
        at io.confluent.connect.jdbc.util.TableDefinitions.get(TableDefinitions.java:62)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:58)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-04-08 09:55:21,304] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
rmoff commented 5 years ago

@eyesmoker this issue is about the JDBCSink - please raise another issue and reference this one if you are seeing issues with the JDBCSource that you think are related.

gdtroszak commented 5 years ago

We've managed to find a workaround that doesn't involve downgrading the MySQL JDBC driver. Appending the nullCatalogMeansCurrent=true query param to the JDBC connection url appears to fix this problem for us.

For example, "connection.url": "jdbc:mysql://mysql:3306/demo?nullCatalogMeansCurrent=true"

zymergen-glonkar commented 5 years ago

@rmoff some context and then request for guidance on next steps.. context.. the issue i believe is because of this: https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-properties-changed.html

which lists at the very end that the default value for nullCatalogMeansCurrent is now false.. so this affects anyone who is upgrading to use mysql driver 8.X. Anyone who is using Debezium on Kafka Connect 5.X i think uses the latest version of debezium which is using the mysql driver 8.X . We have added this JAR to our connect build image. After the update to our connect image use the Mysql 8.X driver the JDBC sink stopped working.. until above change fixed the issue. Hence I think the next steps i think would be..

a) either update the JDBC Connector documentation to specify this so that others know how to fix. (BTW, Confluent documentation has be exceptional !!!) b) or update the JDBC Connector source code to check above condition (i.e. driver version) and add the flag automatically ? c) something else ?

happy to help with either, though I do not know about the exact steps per se and will need guidance :)