confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

New version of Postgresql driver can not support partitioned tables #1092

Open echang0929 opened 3 years ago

echang0929 commented 3 years ago

I upgraded our Confluent Community Package helm cluster from 5.4.1 to 6.0.1, and installed jdbc plugin:

confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-jdbc:10.2.0

However, the new cluster can not support partitioned tables:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table \"recon\".\"tracking_batch_pipeline_orig\" is missing and auto-creation is disabled
    at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:115)
    at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)
    ... 10 more

So, I replaced the new version of Postgresql driver with the old one on CCP 5.4.1:

rm -fr /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/postgresql-42.2.19.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.10/postgresql-42.2.10.jar -O /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/postgresql-42.2.10.jar

Then, it is working now.

mdespriee commented 3 years ago

Same here. Using JdbcSinkConnector, Type: sink, Version: 10.2.0.

Starting from a connector working like a charm, adding PARTITION BY RANGE() to the table definition (and creating some partitions) makes it fail:

Checking PostgreSql dialect for existence of TABLE "hourly_measures"", "context":"[hourly_measures_aggregatesdb_sink|task-0]
Using PostgreSql dialect TABLE "hourly_measures" absent", "context":"[hourly_measures_aggregatesdb_sink|task-0] 
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "hourly_measures" is missing and auto-creation is disabled

Although:

select * from information_schema.tables where table_name like 'hourly%';
table_catalog table_schema table_name table_type self_referencing_column_name reference_generation user_defined_type_catalog user_defined_type_schema user_defined_type_name is_insertable_into is_typed commit_action
aggregates public hourly_measures BASE TABLE YES NO
aggregates public hourly_measures_202107 BASE TABLE YES NO
mdespriee commented 3 years ago

The problem is this change in the underlying pgjdbc implem: https://github.com/pgjdbc/pgjdbc/commit/25eb32c8681eaa4aaac801808b6028e9f5dfbea8#diff-0571f8ac3385a7f7bb34e5c77f8afd24810311506989379c2e85c6c16eea6ce4L1287

But this does not map nicely to this enum https://github.com/confluentinc/kafka-connect-jdbc/blob/60df0624278a0ee162d7774332fadf03992ea147/src/main/java/io/confluent/connect/jdbc/util/TableType.java#L24.

timtebeek commented 2 years ago

Thanks for the details @mdespriee ; I've added https://github.com/confluentinc/kafka-connect-jdbc/pull/1156 to hopefully resolve this going forward.

timtebeek commented 2 years ago

As of 10.6.0 there's now support for partitioned tables again. Update the plugin and in your connectors pass: "table.types": "PARTITIONED TABLE,TABLE" @echang0929 can you confirm & close this issue?

weetster commented 10 months ago

For what it's worth, I'm running version 10.7.4 of this connector and specifying those table type values worked for me against a partitioned table in PostgreSQL.