confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

[#1366] - Sink null control (upsert OracleDialect) #1403

Open JorgeRojasDev opened 3 months ago

JorgeRojasDev commented 3 months ago

Problem

In different insertion, upsert, and deletion queries, there is a need to reimplement the WHERE clause in Sink connectors to ensure that when a null value in a key is received, if the record exists in the database, it is correctly detected. For example: UPDATE

SET EXAMPLE=1 WHERE KEY_1=2 AND KEY_2 = NULL. In this case, the clause KEY_2 = NULL does not return results, however, KEY_2 IS NULL does return them correctly.

Solution

A new optional version is implemented (initially with the upsert mode of OracleDatabaseDialect) to address this issue. In this case, in the ON clause of: MERGE INTO

using (SELECT...) incoming ON (KEY_1=incoming."KEY_1" AND KEY_2=incoming."KEY_2")

Since the type cannot be inferred, it is converted with the boolean parameter enable.null.key.protection into the following query: MERGE INTO

using (SELECT...) incoming ON ((KEY_1=incoming."KEY_1" OR (KEY_1 IS NULL AND incoming."KEY_1" IS NULL)) AND (KEY_2=incoming."KEY_2" OR (KEY_2 IS NULL AND incoming."KEY_2" IS NULL)))

This way, we protect the use case where the key is null. I have implemented it through an additional parameter to avoid sacrificing performance if it's not really necessary.

Is it interesting to continue this process for all upsert, update, and delete operations?

Does this solution apply anywhere else?

Test Strategy

Testing done:
cla-assistant[bot] commented 3 months ago

CLA assistant check
All committers have signed the CLA.