SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

Sink connector with upset mode fails when processing records that are already existing in the DB (update) #119

Closed thibthibus closed 2 years ago

thibthibus commented 2 years ago

Hi,

The sink connector failed after having processed a few records of a partition of a topic with the following error:

2022-02-08 10:27:09,226 INFO Creating table:TABLE_NAME with SQL: CREATE COLUMN TABLE "SCHEMA_NAME"."TABLE_NAME" ("id" BIGINT NOT NULL, "name" VARCHAR(5000) NOT NULL, "financialTypeId" BIGINT NOT NULL, "isPositive" BOOLEAN NULL, "eventActionCode" VARCHAR(5000) NULL, "eventActionTimeStamp" TIMESTAMP NULL) (com.sap.kafka.client.hana.HANAJdbcClient) [task-thread-mysinkconnector-0]
2022-02-08 10:27:11,759 INFO Table "SCHEMA_NAME"."TABLE_NAME" exists. Validate the schema and check if schema needs to evolve (com.sap.kafka.connect.sink.hana.HANASinkTask) [task-thread-mysinkconnector-0]
Auto Evolution of schema is not supported (com.sap.kafka.connect.sink.hana.HANASinkTask) [task-thread-mysinkconnector-0]
2022-02-08 10:27:11,759 ERROR Table "SCHEMA_NAME"."TABLE_NAME" has an incompatible schema to the record Schema.

The target HANA table has been created with the following setup of the connector:

auto.create: true
auto.evolve: true
topicname.table.name: "SCHEMA_NAME"."TABLE_NAME"
topicname.insert.mode: upsert
topicname.pk.mode: record_key
topicname.pk.fields: id

Since it worked for a few records I suspect this is caused by a specific record in a topic. I don't find the infos in the connector logs to find which one. Is there a way to change the log level (DEBUG?) so that the offset & partition of the record currently processed is logged?

Thanks

Regards,

Thibaut

thibthibus commented 2 years ago

Actually I think the issue is not with the data but with the upsert statement. I check and it fails once the connector processes a record in the topic that has the same key than the one of a previously inserted record. How can I check what the problem is ? The stack doesn't really help.

thibthibus commented 2 years ago

Changing the issue title as I could clearly reproduce the problem that connector fails when trying to process records that are already existing in the DB. See logs attached 1644421263_56969_EE57B850-A1DA-4978-8A09-F01E066A8475.txt

elakito commented 2 years ago

@thibthibus this is related to https://github.com/SAP/kafka-connect-sap/issues/116. The fixed code treats all string based types compatible but other types such as numeric primitives are still checked strictly. I regret not having added a debug line to show which types are compared. I'll make this quick change.

elakito commented 2 years ago

If you can use the build from the current master which will log the mismatched type in its info log, we can find a solution. I suppose we should treat other types also as compatible as long as the normal JDBC mapping handles the conversion automatically.

thibthibus commented 2 years ago

Hi @elakito , thanks very much for the changes and your reactivity!!! Really appreciate. I wish I could better understand Scala code and contribute more but I can't... I built and deployed the 0.9.3-SNAPSHOT jar and can see the log added: INFO Target type -7 is incompatile with source type 16 (com.sap.kafka.connect.sink.hana.HANASinkTask) Hope that helps Regards, Thibaut

elakito commented 2 years ago

ok. Thanks for testing. We need to update the code to treat BIT(-7) and BOOLEAN(16) as compatible.

thibthibus commented 2 years ago

Thanks a lot @elakito now everything is working. When do you plan to release these changes in the 0.9.3 ? I haven't found the jars on Maven Central Repo for example. do you publish them somewhere ?

elakito commented 2 years ago

@thibthibus Thanks for verifying this fix. We haven't published the binaries to maven central but this was in plan and we will do it soon. Until now, the binaries are uploaded to the release area of this repo. https://github.com/SAP/kafka-connect-sap/releases

thibthibus commented 2 years ago

@elakito ok understood. Do you think you will be able to release the 0.9.3 soon ? I can't really get productive with a SNAPSHOT release... Thanks in advance. Thibaut

thibthibus commented 2 years ago

Hi @elakito , Sorry for asking again. When do you plan to release a newer version of the connector ? Thanks Thibaut

elakito commented 2 years ago

@thibthibus sorry for the delay. I hope to release it by this weekend.

fluenbox commented 1 year ago

Hi, I have the same issue with others data types. SAP Hana connector is creating the table with some smallint fields but after some inserts, the SAP hana connector validates the schema again and shows an error asking for another data type: "target type 7 is incompatile with source type 8 ". To solve this, I changed the data types to integer and its works but this is not a valid solution in my opinion. Is there a way to deactivate this validation? or any other solution? thanks and sorry to revisited this old threat.