snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
138 stars 97 forks source link

Connector fails to create table #80

Closed meinac closed 4 years ago

meinac commented 4 years ago

Hi, first of all, thanks a lot for building & maintaining this connector, it will probably help lots of developers & companies like us. What I am trying to do is, replicating data from one PostgreSQL database to snowflake by using debezium as source and this connector as sink. Debezium part works flawless as it's a bit more mature but I am having problems with snowflake sink connector. According to logs, it can not create a table because the session does not have a current database even though I set the database name while configuring the connector.

Here is the full log;

[SF_KAFKA_CONNECTOR] Creating new table test_source_public_items_196930796. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1)
[SF_KAFKA_CONNECTOR] Exception: Failed to prepare SQL statement
[SF_KAFKA_CONNECTOR] Error Code: 2001
[SF_KAFKA_CONNECTOR] Detail: SQL Exception, reported by Snowflake JDBC
[SF_KAFKA_CONNECTOR] Message: Cannot perform CREATE TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:139)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:64)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:495)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:372)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:505)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:249)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:187)
[SF_KAFKA_CONNECTOR] net.snowflake.client.core.SFStatement.execute(SFStatement.java:796)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:308)
[SF_KAFKA_CONNECTOR] net.snowflake.client.jdbc.SnowflakePreparedStatementV1.execute(SnowflakePreparedStatementV1.java:509)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.createTable(SnowflakeConnectionServiceV1.java:65)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.createTable(SnowflakeConnectionServiceV1.java:79)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.createTableAndStage(SnowflakeSinkServiceV1.java:681)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.init(SnowflakeSinkServiceV1.java:231)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.insert(SnowflakeSinkServiceV1.java:315)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1$ServiceContext.access$100(SnowflakeSinkServiceV1.java:172)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1.insert(SnowflakeSinkServiceV1.java:77)
[SF_KAFKA_CONNECTOR] java.util.ArrayList.forEach(ArrayList.java:1257)
[SF_KAFKA_CONNECTOR] com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:195)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
[SF_KAFKA_CONNECTOR] org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
[SF_KAFKA_CONNECTOR] java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[SF_KAFKA_CONNECTOR] java.util.concurrent.FutureTask.run(FutureTask.java:266)
[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[SF_KAFKA_CONNECTOR] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[SF_KAFKA_CONNECTOR] java.lang.Thread.run(Thread.java:748)

And here is the configuration for the connector if it helps;

{
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "name": "test_sink",
  "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
  "tasks.max": "1",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "topics": [
    "test_source.public.items"
  ],
  "snowflake.url.name": "....",
  "snowflake.user.name": "kafka_connect",
  "snowflake.private.key": "....",
  "snowflake.database.name": "kafka_connect_test",
  "snowflake.schema.name": "kafka_schema",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

The version of Snowflake sink connector I am using is 0.5.4.

meinac commented 4 years ago

I was able to create connector by using kafka connect rest api maybe connect control center was messing up something.