confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

Postgis support using Debezium #764

Open daanroosen-DS opened 4 years ago

daanroosen-DS commented 4 years ago

Hello everyone,

I am using the JDBC sink-connector together with Debezium. At the moment I am facing the following issue. Our data contains Postgis data types, which are supported by Debezium and Kafka. However, not (yet) by the JDBC connector.

The following JSON is an example of a Debezium message containing a Postgis type field, namely Geometry.

{  
   "schema":{  
      "type":"struct",
      "fields":[  
         {  
            "type":"struct",
            "fields":[  
               {  
                  "type":"bytes",
                  "optional":false,
                  "field":"wkb"
               },
               {  
                  "type":"int32",
                  "optional":true,
                  "field":"srid"
               }
            ],
            "optional":true,
            "name":"io.debezium.data.geometry.Geometry",
            "version":1,
            "doc":"Geometry",
            "field":"geometry"
         },
         {  
            "type":"int64",
            "optional":true,
            "name":"org.apache.kafka.connect.data.Timestamp",
            "version":1,
            "field":"trans_timestamp"
         },
         {  
            "type":"string",
            "optional":false,
            "field":"operation"
         }
      ],
      "optional":false,
      "name":"server.schema.table.Value"
   },
   "payload":{  
      "geometry":{  
         "wkb":"AQYAACCKegBAAQAAAAEDAAAAAQAAAOYFAADHxZRIf...",
         "srid":40000
      },
      "trans_timestamp":1576498086666,
      "operation":"r"
   }
}

As you can see, the field is actually a struct which contains two parts, wkb and srid. When I try to process this with the JDBC-connector, it gives among others type errors.

I think it would be a nice enhancement to the JDBC-connector if Postgis types would be fully supported.

In the meanwhile, are there any suggestions for a work-around?

Thanks in advance.

gharris1727 commented 4 years ago

@daanroosen-DS Support for complex data types is not planned at this time, but if you have an idea on what the functionality would look like, we would be happy to review a feature PR.

As far as working around this restriction, you're going to need to do some preprocessing on the record before it gets to the JDBC connector, such as in an SMT. I think that you may be able to use the Flatten transformation to make the nested fields into root-level fields. Alternatively you may be able to write your own custom SMT to transform the data however you want, such as into an embedded JSON string.

ekremtoprak commented 4 years ago

I am facing the same issue. I hope complex data type support is planned soon

daanroosen-DS commented 4 years ago

@gharris1727 Thanks for your quick response.

Unfortunately, I do not know the Postgis types well enough to describe how the functionality should be. However, I think the easiest way to make it work is by casting the wkb string to Geometry in the insert.

For example: insert into tablename(geometrie) values('01060000208A7A000000000000'::geometry);

TBMK, the SRID is not needed in the insert. It is needed for creating the column:

create table tablename (geometrie geometry(MultiPolygon,SRID))

For me, it would indeed be the best way to try and make a SMT that extracts the wkb part en parses it as a string. As a consequence, the target field will not to be a txt because you can't insert a string into a geometry field directly (without cast).

Karthik248 commented 4 years ago

@daanroosen-DS, were you able to workout a SMT to cast geometry fields?

rgannu commented 4 years ago

Also when you implement PostGIS datatypes, please add support for array of PostGIS datatypes.

rgannu commented 3 years ago

I stumbled upon this issue last year when I started using the debezium for our need. Our sink connector is not back to DB but rather to AMQP and hence I left in between this issue.

I feel this shouldn't be handled in SMT as the postgis data types are always going to be there. We will be putting too much work load on the SMT which must open the payload all the time.

There are 2 options in my opinion.

  1. Add the debezium core library to the Kafka JDBC sink connector and support postgis datatypes.
  2. Create a new JDBC sink connector with the support of postgis datatypes.

The first approach is what I had temporarily done. I can dust and refine and submit as a PR to this issue. (if this is fine for the reporter). Please let me know.

dorocoder commented 3 years ago

I stumbled upon this issue last year when I started using the debezium for our need. Our sink connector is not back to DB but rather to AMQP and hence I left in between this issue.

I feel this shouldn't be handled in SMT as the postgis data types are always going to be there. We will be putting too much work load on the SMT which must open the payload all the time.

There are 2 options in my opinion.

  1. Add the debezium core library to the Kafka JDBC sink connector and support postgis datatypes.
  2. Create a new JDBC sink connector with the support of postgis datatypes.

The first approach is what I had temporarily done. I can dust and refine and submit as a PR to this issue. (if this is fine for the reporter). Please let me know.

I'm one of those who are suffering from the same issue. You might've worked out this issue by modifying single class: https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java. If so, you may propose an alternative one against PostgreSqlDatabaseDialect. I suggest you to post your makeshift here in the first place to be a big help for users using Debezium and GIS colums.

rgannu commented 3 years ago

@dorocoder Unfortunately I lost the source code but only have the JAR file. Thatz why I couldn't submit the PR immediately. I am re-doing it now and will submit this weekend.

rgannu commented 3 years ago

I have submitted PR for review #1048.

rgannu commented 3 years ago

Can someone please review my submitted PR? Thanks.

psujit775 commented 2 years ago

having the same issue. @daanroosen-DS have you been able to resolve this issue?

psujit775 commented 2 years ago

@rgannu built jar from your PR #1048 and replcaed with the original one. It's working great for geometry data type.

Thank you from the bottom of my heart ❤️

Dordor333 commented 1 year ago

@rgannu built jar from your PR #1048 and replcaed with the original one. It's working great for geometry data type.

Thank you from the bottom of my heart ❤️

From where can I get the new jar for sink postgres and not the source postgres?

abhishekporter commented 11 months ago

Can you guys please share jar files here?

sm003ash commented 4 months ago

The PR is still open. Any plans on merging this? We need it to sink postgis data from kafka to postgres. Currently there is not way this can be done.