confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
23 stars 958 forks source link

Array data types are not handled by PostgresSQL JDBC sink connector #835

Open rgannu opened 4 years ago

rgannu commented 4 years ago

There are many array datatypes supported by PostgreSQL. https://github.com/debezium/debezium/blob/master/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgOid.java

The PostgreSqlDatabaseDialect.java doesn't handle any of the array data types.

https://github.com/confluentinc/kafka-connect-jdbc/blob/62eee00939b92f86bcc719773527626e307c6f27/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java#L186

INSERT INTO array_table (sintArr, intArr, bintArr, textArr, charArr, varcharArr, numArr, varNumArr) VALUES ('{1,2}', '{1,2}', '{1550166368505037572, 1550166368505037572}', '{"one","two","three"}', '{"cone","ctwo","cthree"}', '{"vcone","vctwo","vcthree"}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}');

- Exceptions are thrown in the kafka connect log:

```bash
connect_1          | 2020-04-15 12:39:15,104 INFO   ||  Using PostgreSql dialect table "array_table" absent   [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect]
connect_1          | 2020-04-15 12:39:15,105 ERROR  ||  WorkerSinkTask{id=unifly-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1          | org.apache.kafka.connect.errors.ConnectException: null (ARRAY) type doesn't have a mapping to the SQL database column type
connect_1          |    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1753)
connect_1          |    at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:221)
connect_1          |    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1669)
connect_1          |    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$33(GenericDatabaseDialect.java:1658)
connect_1          |    at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:558)
connect_1          |    at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:597)
connect_1          |    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1660)
connect_1          |    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1583)
connect_1          |    at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:91)
connect_1          |    at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
connect_1          |    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:120)
connect_1          |    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
connect_1          |    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
connect_1          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
connect_1          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
connect_1          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
connect_1          |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
connect_1          |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
connect_1          |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
connect_1          |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect_1          |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect_1          |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect_1          |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect_1          |    at java.base/java.lang.Thread.run(Thread.java:834)
deuscapturus commented 4 years ago

https://github.com/confluentinc/kafka-connect-jdbc/pull/805