confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
22 stars 958 forks source link

Support preferred postgres data type of timestamp with time zone #921

Open jfinzel opened 4 years ago

jfinzel commented 4 years ago

Our biggest problem with the JDBC Sink Connector for Postgres is that it does not support timestamptz or timestamp with time zone, which is the standard and preferred method of storing time in postgres - in fact we don't even allow timestamp without time zone anymore. There are serious issues with timestamp without time zone, elaborated in the Postgres wiki: https://wiki.postgresql.org/wiki/Don't_Do_This#Don.27t_use_timestamp_.28without_time_zone.29

It is possible to create a table manually using timestamp with time zone, but the JDBC Sink Connector interprets the timestamps always as UTC offset, with apparently no option to configure it differently. The db.timezone property does not make any difference. This is evident in looking at the logs where the hardcoded value for each timestamp shows offset +00 added to the end, indicating a UTC offset. I have db_timezone set to America/New_York in my config.

This is using version 5.5.0. I would be grateful for any insight or workaround, most especially for supporting this data type natively!

alisator commented 3 years ago

Hello I encoutered the same issue, we use timestamptz. Error message below.

Caused by: org.postgresql.util.PSQLException: ERROR: column "df_created" is of type timestamp with time zone but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.

I found, there is a custom debezium convertor, which has limits in precision, but it might work.

Edit. I tried and it does not work. The issue remains.

marcotollini commented 3 years ago

While we wait for an official solution, a temporary fix is as follows:

CREATE OR REPLACE FUNCTION varchar_to_timestamp (varchar) RETURNS timestamptz AS $$ SELECT to_timestamp($1, 'YYYY-MM-DD"T"HH24:MI:SS.USZ') $$ LANGUAGE SQL; CREATE CAST (varchar as timestamptz ) WITH FUNCTION varchar_to_timestamp (varchar) AS IMPLICIT;

It is far from perfect, but it works.

jfinzel commented 3 years ago

@marcotollini thanks for the input. This is similar to our own workaround which is to do no conversions at all on raw data but only through views using postgres built-in functions, which can also be indexed. For example we don't even convert bigint epochs in Kafka but land them as-is and provide conversions through views.

alisator commented 3 years ago

While we wait for an official solution, a temporary fix is as follows:

CREATE OR REPLACE FUNCTION varchar_to_timestamp (varchar) RETURNS timestamptz AS $$ SELECT to_timestamp($1, 'YYYY-MM-DD"T"HH24:MI:SS.USZ') $$ LANGUAGE SQL; CREATE CAST (varchar as timestamptz ) WITH FUNCTION varchar_to_timestamp (varchar) AS IMPLICIT;

It is far from perfect, but it works.

Hello, we used it. But we added ::timestamp without a time zone at time zone 'Etc/UTC' Because there is no knowledge of time zone, so Postgres saved it uncorrectly accroding to local time, so it addup a +01 in our case :) When we forced the implicit time zone, it behaved correctly in the database with our local time zone :)

CREATE OR REPLACE FUNCTION varchar_to_timestamp (varchar) RETURNS timestamptz AS $$ SELECT to_timestamp($1, 'YYYY-MM-DD"T"HH24:MI:SS.USZ')::timestamp without time zone at time zone 'Etc/UTC' $$ LANGUAGE SQL; CREATE CAST (varchar as timestamptz ) WITH FUNCTION varchar_to_timestamp (varchar) AS IMPLICIT;

max-prosper commented 3 years ago

I managed to solve this by replacing original timestamp with the timestamp of the message. Use ReplaceField and InsertFiled SMTs like that:

transforms=ReplaceField,InsertField
transforms.replaceField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replaceField.blacklist=last_updated_at
transforms.insertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertField.timestamp.field=last_updated_at

But it works only for last_updated_at-like fields and only if slight difference between the original timestamp and Kafka message timestamp (which can be more than a second) doesn't make big difference for your business logic.

pmatheson-greenphire commented 3 years ago

I used JDBC connection parameter ?stringtype=unspecified to solve the character varying to timestampz issue on PG 10.

https://jdbc.postgresql.org/documentation/head/connect.html#connection-parameters If I'm reading it right this allows the server to select the type to insert and ignores the bound type of the JDBC client prepared statement?

dungnt081191 commented 3 years ago

Hi @jfinzel @max-prosper @alisator @marcotollini @pmatheson-greenphire i have the same issue here. could you please take a look for me. https://github.com/confluentinc/kafka-connect-jdbc/issues/1127

asmoker commented 9 months ago

I used JDBC connection parameter ?stringtype=unspecified to solve the character varying to timestampz issue on PG 10.

https://jdbc.postgresql.org/documentation/head/connect.html#connection-parameters If I'm reading it right this allows the server to select the type to insert and ignores the bound type of the JDBC client prepared statement?

After add this param, encounter another error:

org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT