confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

kafka source debezium - postgress to sink -postgress databas fails with error ========Value (STRUCT) type doesn't have a mapping to the SQL database column type #661

Open vaibhavpatil123 opened 5 years ago

vaibhavpatil123 commented 5 years ago

kafka source debezium - postgress to sink -postgress databas fails with error

Refer https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres

created 2 connector source and sink for postgress database

http://localhost:8084/connectors { "name": "jdbc-sink2", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "dbserver1.inventory.customers", "connection.url": "jdbc:postgresql://192.168.56.1:5432/sinkinventory?user=postgres&password=postgres", "auto.create": "true", "insert.mode": "upsert",

"key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "internal.key.converter.schemas.enable": "true", "internal.value.converter.schemas.enable": "true", "include.schema.changes": "true" ,

"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changetopic.regex":"(.*)"

}

}


{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.user": "postgres", "database.dbname": "postgres", "tasks.max": "1", "database.hostname": "192.168.56.1", "database.password": "postgres", "name": "inventory-connector", "database.server.name": "dbserver1", "database.port": "5432", "schema.whitelist": "inventory", "table.whitelist": "customers", "validate.non.null": "false",

"schemas.enable" :"true"
}
}

Events of database changes :

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":{"id":1004,"first_name":"ss","last_name":"try","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"ss","last_name":"ssssss","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"postgresql","name":"dbserver1","db":"postgres","ts_usec":1560671302316817,"txId":582,"lsn":23878056,"schema":"inventory","table":"customers","snapshot":true,"last_snapshot_record":true,"xmin":null},"op":"u","ts_ms":1560671302371}}


Error on Sink connector as below 👎

connectssink_1 | at java.lang.Thread.run(Thread.java:748) connectssink_1 | Caused by: org.apache.kafka.connect.errors.ConnectException: dbserver1.inventory.customers.Value (STRUCT) type doesn't have a mapping to the SQL database column type connectssink_1 | at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1688) connectssink_1 | at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:216) connectssink_1 | at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1606) connectssink_1 | at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$33(GenericDatabaseDialect.java:1595) connectssink_1 | at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:415) connectssink_1 | at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:454) connectssink_1 | at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1597) connectssink_1 | at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1520) connectssink_1 | at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:92) connectssink_1 | at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:62) connectssink_1 | at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86) connectssink_1 | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63) connectssink_1 | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75) connectssink_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538) connectssink_1 | ... 10 more connectssink_1 | 2019-06-16 09:27:22,668 ERROR || WorkerSinkTask{id=jdbc-sink2-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]

vaibhavpatil123 commented 5 years ago

Value (STRUCT) type doesn't have a mapping to the SQL database column type

iguenkinrl commented 5 years ago

facing the same problem, any resolution ?

MashaFomina commented 4 years ago

@iguenkinrl I have the same problem. Did you find the solution to the problem?

vaibhavpatil123 commented 4 years ago

Hi All check below link https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas.enable-requires-schema-and-payload-fields/

jinlunjie commented 4 years ago

I see your payload does have schema part, why it still not working? I am facing same issue

My message contains schema, and I set schema value enable = true

kristofferth commented 4 years ago

Please help, I am also facing this issue!

oguzhanaygn commented 4 years ago

I'm facing the same issue for SQL Server Debezium Source & JDBC Sink combination, I need delete.enabled=true option, so I should use pk_mode=record_key. When I use this, problem is kafkaproduce.dbo.products.Value (STRUCT) type doesn't have a mapping to the SQL database column type Pls help to get rid of this problem. Regards.

shayannyc25 commented 4 years ago

i get the same error. I am using the Confluent Source Connector IBMMQSourceConnector to read flat json messages from an ibm mq queue and put them in a topic and the use the Confluent Sink Connector for JDBC to read the messages from the topic and insert them in table in MS SQL Server.

zj5220924 commented 4 years ago

I get the same error for mysql

vijay-andaluri commented 4 years ago

Getting same error..

stevenwilliamsmis commented 4 years ago

I just hit this error today using Debezium SQL CDC Source to JDBC Sql source. I found another issue that explains how to get around this issue. https://github.com/confluentinc/kafka-connect-jdbc/issues/585

You need to use ExtractNewRecordState transformation to flatten the topic data. Here is the documentation on it. https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html

Hope this helps.

NorakGithub commented 5 months ago

I just hit this error today using Debezium SQL CDC Source to JDBC Sql source. I found another issue that explains how to get around this issue. #585

You need to use ExtractNewRecordState transformation to flatten the topic data. Here is the documentation on it. https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html

Hope this helps.

Thank you @stevenwilliamsmis this help.

By the way the link is expired and here the new link https://debezium.io/documentation/reference/stable/transformations/event-flattening.html