confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

Sink connector in upsert mode overwrites existing values in Postgres with null values for fields not provided #1067

Open Godzillarissa opened 3 years ago

Godzillarissa commented 3 years ago

I have this Postgres database table with the fields "id", "flavor" and "item". I also have a Kafka topic with two messages (this is a first test, so I put everything in the value, leaving the key empty):

{
  "schema": {
    "type": "struct",
    "optional": false,
    "version": 1,
    "fields": [
      {
        "field": "id",
        "type": "int64",
        "optional": false
      },
      {
        "field": "flavor",
        "type": "string",
        "optional": true
      },
      {
        "field": "item",
        "type": "string",
        "optional": true
      }
    ]
  },
  "payload": {
    "id": 1,
    "flavor": "chocolate"
  }
}
{
  "schema": {
    "type": "struct",
    "optional": false,
    "version": 1,
    "fields": [
      {
        "field": "id",
        "type": "int64",
        "optional": false
      },
      {
        "field": "flavor",
        "type": "string",
        "optional": true
      },
      {
        "field": "item",
        "type": "string",
        "optional": true
      }
    ]
  },
  "payload": {
    "id": 1,
    "item": "cookie"
  }
}

Now I'd like to use the Confluent JDBC (Sink) Connector for persisting the kafka messages in UPSERT-mode, hoping to get to the following end result in the database:

id | flavor    | item
----------------------
1  | chocolate | cookie

Ẁhat I did get, however, was this:

id | flavor    | item
----------------------
1  | null      | cookie

The source connector apparently uses a statement like "INSERT .. ON CONFLICT .. DO UPDATE SET .. ", right? I would have assumed that if I don't provide a field in the Kafka message itself (i.e. leave it out, since it's optional) it wouldn't be included in the "SET" statement above.

The way it works out it seems like the null-value for the missing fields is inferred. So that the null value is actually included in the "SET" statement and then plastered over my existing data.

Is there a way to "partially" update my records in upsert-mode (i.e. do not set any value that is not provided in the kafka message)?

OneCricketeer commented 3 years ago

Guess the solution would involve filtering these nonKeyColumns

Godzillarissa commented 3 years ago

That might have been a good point to start with, but unfortunately I won't be able to try it out, it seems. My team has decided to process the data earlier in the system, using a KTable, as it seems a bit more flexible and won't involve as much digging into the inner workings of the connector itself.

matiasdsalerno commented 3 years ago

I had the same issue with MySQL. I ended up creating my own consumer that writes into MySQL, only mapping the columns that are present in the message...

It's actually tricky, because in some cases, you. have fields in the object that are missing because they have to be set to null. This doesn't happen with JSON, but I understand that happens with Avro, where fields can't be missing, but can be set to null or undefined.

Maybe something like this can be set as a new mode for the update mode property:

format missing field behavior null field behavior undefined field behavior
JSON Schema ignore column set column to null N/A
Avro N/A set column to null ignore column
Protobuf N/A set column to null ignore column

I would actually need to figure out how Protobuf and Avro exactly behave.

Sheikhin commented 7 months ago

I agree with @matiasdsalerno

I am using JDBC SINK, and I have the same issue with Postgres DB with upsert mode. I have ended up creating two connectors to update the respective fields separately. It should allow us to patch the row. Setting the null explicitly for all other fields that are not coming would have an overhead, but at least we should have that configuration.