confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

Postgresql sink not setting schema correctly #246

Open curtisq opened 7 years ago

curtisq commented 7 years ago

When creating a JDBC Sink connector using postgres with the following config

{
  "name":"pg-sink-connector",
  "config":{
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max":1,
    "topics":"unagi-sink",
    "connection.url":"jdbc:postgresql://postgres:5432/unagi",
    "connection.user":"unagi",
    "connection.password:"...",
    "auto.create":"true",
    "auto.evolve":"true",
    "batch.size":10,
    "table.name.format":"unagi"
  }
}

And reading from a topic with both key and value of them avro schema

{"namespace": "com.squarespace.unagi.avro",
 "type": "record",
 "name": "UnagiDoc",
 "fields": [
     {"name": "id",   "type":["string", "null"], "default": "unknown"},
     {"name": "data",   "type":["string", "null"], "default": "unknown"}
 ]
}

I run into a problem where the connector continues to think the table doesnt exist, goes to create it, creates it and then thinks it doesnt have columns so it tries to ammend and blows up. If the table already exists going in, it fails when trying to create it.

I believe this is due to there being no way to set schema (it writes to and ammends to the table name in the public schema but seems to think the postgres schema it should use to check for table/columns existence should be "$user")

Here is some log output from the process / postgres in docker-compose

kafka-connect_1    | 2017-07-19 22:08:20,285 INFO   ||  Checking table:unagi exists for product:PostgreSQL schema:$user catalog:   [io.confluent.connect.jdbc.sink.DbMetadataQueries]
kafka-connect_1    | 2017-07-19 22:08:20,313 INFO   ||  product:PostgreSQL schema:$user catalog:unagi -- table:unagi is absent   [io.confluent.connect.jdbc.sink.DbMetadataQueries]
kafka-connect_1    | 2017-07-19 22:08:20,316 INFO   ||  Creating table:unagi with SQL: CREATE TABLE "unagi" (
kafka-connect_1    | "data" TEXT NULL,
kafka-connect_1    | "id" TEXT NULL)   [io.confluent.connect.jdbc.sink.DbStructure]
kafka-connect_1    | 2017-07-19 22:08:20,382 INFO   ||  Querying column metadata for product:PostgreSQL schema:$user catalog:unagi table:unagi   [io.confluent.connect.jdbc.sink.DbMetadataQueries]
kafka-connect_1    | 2017-07-19 22:08:20,395 INFO   ||  Updating cached metadata -- DbTable{name='unagi', columns={}}   [io.confluent.connect.jdbc.sink.metadata.TableMetadataLoadingCache]
kafka-connect_1    | 2017-07-19 22:08:20,397 INFO   ||  Amending table to add missing fields:[SinkRecordField{schema=Schema{STRING}, name='data', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='id', isPrimaryKey=false}] maxRetries:10 with SQL: [ALTER TABLE "unagi"
kafka-connect_1    | ADD "data" TEXT NULL,
kafka-connect_1    | ADD "id" TEXT NULL]   [io.confluent.connect.jdbc.sink.DbStructure]
postgres_1         | ERROR:  column "data" of relation "unagi" already exists
postgres_1         | STATEMENT:  ALTER TABLE "unagi"
postgres_1         |    ADD "data" TEXT NULL,
postgres_1         |    ADD "id" TEXT NULL
kafka-connect_1    | 2017-07-19 22:08:20,400 WARN   ||  Amend failed, re-attempting   [io.confluent.connect.jdbc.sink.DbStructure]
kafka-connect_1    | org.postgresql.util.PSQLException: ERROR: column "data" of relation "unagi" already exists

notice product:PostgreSQL schema:$user catalog:

When I go into postgres and manually create a schema "$user" and create the unagi table everything works fine and the connector sees the schema and writes to the public schema.

Is there some way to specifiy the schema to read from? or am I using this connector wrong?

jzk commented 6 years ago

I run into same issue this afternoon, find one solution from here, so basically you can try to set "current schema" in the JDBC connection url. not idea, wish that is supported by connect itself

arupKpanja commented 4 years ago

I run into same issue this afternoon, find one solution from here, so basically you can try to set "current schema" in the JDBC connection url. not idea, wish that is supported by connect itself

This worked for us..thanks