confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
22 stars 958 forks source link

How to migrate enum type to another postgres? #1155

Open hunghoang-ct opened 2 years ago

hunghoang-ct commented 2 years ago

Hi everyone, currently I'm migrating data from one Postgres to another that using Debezium and jdbc sink connector. My source DB schema has a table name "film" that contains a column name "rating" with type of enum. When "rating" column's data transfers from source database to Kafka via Debezium it has been transformed into string. And then the jdbc connect cannot convert them back to enum. So are there any ways to migrate enum type from a postgres to another?

Film table avro schema:

{
  "subject": "dvdrental.public.film-value",
  "version": 1,
  "id": 12,
  "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dvdrental.public.film\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"film_id\",\"type\":{\"type\":\"int\",\"connect.default\":0},\"default\":0},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"release_year\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"language_id\",\"type\":{\"type\":\"int\",\"connect.type\":\"int16\"}},{\"name\":\"rental_duration\",\"type\":{\"type\":\"int\",\"connect.default\":3,\"connect.type\":\"int16\"},\"default\":3},{\"name\":\"rental_rate\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":4,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"4\"},\"connect.default\":\"\\u0001ó\",\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"},\"default\":\"\\u0001ó\"},{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"replacement_cost\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":5,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"5\"},\"connect.default\":\"\\u0007Ï\",\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"},\"default\":\"\\u0007Ï\"},{\"name\":\"rating\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"G,PG,PG-13,R,NC-17\"},\"connect.name\":\"io.debezium.data.Enum\"}],\"default\":null},{\"name\":\"last_update\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.default\":0,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"},\"default\":0},{\"name\":\"special_features\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"string\"]}],\"default\":null}],\"connect.name\":\"dvdrental.public.film.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.postgresql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"lsn\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"xmin\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.postgresql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"dvdrental.public.film.Envelope\"}"

jdbc sink connect registry

{"name":"dest-connector-film","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"dvdrental.public.film","connection.url":"jdbc:postgresql://dest-postgres:5432/dvdrental?user=postgres&password=123456","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","auto.create":"true","insert.mode":"upsert","pk.fields":"film_id","pk.mode":"record_value","key.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://schema-registry:8081","key.converter.enhanced.avro.schema.support":"true","value.converter":"io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url":"http://schema-registry:8081","value.converter.enhanced.avro.schema.support":"true"}}

Error trace log:

Caused by: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "dvdrental"."public"."film" ("film_id","title","description","release_year","language_id","rental_duration","rental_rate","length","replacement_cost","rating","last_update","special_features") VALUES (133,'Chamber Italian','A Fateful Reflection of a Moose And a Husband who must Overcome a Monkey in Nigeria',2006,1,7,4.99,117,14.99,'NC-17','2013-05-26 14:50:58.951+00',?) ON CONFLICT ("film_id") DO UPDATE SET "title"=EXCLUDED."title","description"=EXCLUDED."description","release_year"=EXCLUDED."release_year","language_id"=EXCLUDED."language_id","rental_duration"=EXCLUDED."rental_duration","rental_rate"=EXCLUDED."rental_rate","length"=EXCLUDED."length","replacement_cost"=EXCLUDED."replacement_cost","rating"=EXCLUDED."rating","last_update"=EXCLUDED."last_update","special_features"=EXCLUDED."special_features" was aborted: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 241  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 241
org.postgresql.util.PSQLException: ERROR: column "rating" is of type mpaa_rating but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 241

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
    ... 11 more
maurolscla commented 2 years ago

Curious about this as well. I have the same problem within the context of psql to psql migration.