confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

Sink Connector with Snowflake - Tables not Found #1136

Open brunocostalopes opened 3 years ago

brunocostalopes commented 3 years ago

Hi,

I've setup Kafka locally and I'm trying to use the JDBC Sink Connector to write data into Snowflake but I keep getting the error below suggesting that the table can't be found in the database (I have auto creation disabled since it's not supported with Snowflake).

[2021-11-08 18:52:19,300] INFO [test-connector-jdbc|task-0] Checking Generic dialect for existence of TABLE "test_database"."test_schema"."test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:575)
[2021-11-08 18:52:19,455] INFO [test-connector-jdbc|task-0] Using Generic dialect TABLE "test_database"."test_schema"."test_table" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:583)
[2021-11-08 18:52:19,572] ERROR [test-connector-jdbc|task-0] Error encountered in task test-connector-jdbc-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "test_database"."test_schema"."test_table" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:118)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.unrollAndRetry(JdbcSinkTask.java:133)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

I can see the following query being run from the JDBC driver in Snowflake (and running it directly in Snowflake, with the same user configured in the JDBC connector, does return data.

show /* JDBC:DatabaseMetaData.getTables() */ tables like 'TEST_TABLE' in account

And finally my configuration below:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:snowflake://######.eu-west-1.snowflakecomputing.com/?db=test_database&schema=test_schema&role=role",
    "connection.user": "test_user",
    "connection.password": "test_password",
    "topics.regex": "test_table",
    "table.name.format": "test_database.test_schema.test_table_${topic}",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq",
    "errors.deadletterqueue.topic.replication.factor":"1",
    "errors.log.enable":"true",
    "quote.sql.identifiers":"never"

}

Any idea what I might be missing here?

OneCricketeer commented 2 years ago

Is there a specific reason you need to use the JDBC Connector?

https://www.confluent.io/hub/snowflakeinc/snowflake-kafka-connector

brunocostalopes commented 2 years ago

I wanted to be able to use some of the functionalities available in the JDBC connector such as auto table creation, auto schema evolution and use of upserts, but I have since realised that none of these are available with Snowflake.

The only advantage for us ended up being that the JDBC connector can write the data directly into tables with a well defined schema, rather than just loading the data into VARIANT fields on the Snowflake side, like the Snowflake connector does.

In any case, regarding this issue in particular, the issue was that the JDBC connector uses getTables() to get the list of tables it is case sensitive.

https://stackoverflow.com/questions/69890973/kafka-jdbc-sink-connector-cant-find-tables-in-snowflake https://stackoverflow.com/questions/62224430/kafka-connect-jdbc-sink-quote-sql-identifiers-not-working/62232384#62232384