confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
18 stars 955 forks source link

Debezium sink connector - Delete rows doesn't work when source uses ExtractNewRecordState #1421

Open sm0003-alt opened 1 month ago

sm0003-alt commented 1 month ago

I have a debezium source connector for postgres setup. I also have a debezium JDBC sink connector setup to sync data to another target postgres db. My source connector config looks like:

{
  "name": "source-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "sa",
    "database.password": "****",
    "database.dbname": "postgres",
    "schema.include.list": "public",
    "table.include.list": "public.devicetype",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "true",
    "key.converter.schema.registry.url": "http://host.docker.internal:9081",
    "key.converter.auto.register.schemas": false,
    "key.converter.use.latest.version": true,
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://host.docker.internal:9081",
    "value.converter.auto.register.schemas": false,
    "value.converter.use.latest.version": true,
    "topic.prefix": "db",
    "transforms": "Unwrap,Reroute",
    "transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.Unwrap.delete.tombstone.handling.mode": "rewrite-with-tombstone",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "([^\\.]+)\\.[^\\.]+\\.([^\\.]+)$",
    "transforms.Reroute.topic.replacement": "$1_$2"
  }
}

I am able to sync data off a postgres table to a kafka topic with avro schema registered.

My kafka sink connector looks like:

{
    "name": "jdbc-sink-connector",  
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schemas.enable": "true",
        "key.converter.schema.registry.url": "http://host.docker.internal:9081",
        "key.converter.auto.register.schemas": false,
        "key.converter.use.latest.version": true,
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://host.docker.internal:9081",
        "value.converter.auto.register.schemas": false,
        "value.converter.use.latest.version": true,
        "topics": "db_devicetype",
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://postgres/postgres",
        "connection.username": "sa",
        "connection.password": "****",
        "insert.mode": "upsert",
        "delete.enabled": true,
        "primary.key.mode": "record_key",
        "primary.key.fields": "id",
        "schema.evolution": "none",
        "table.name.format": "read_only_device_type",
        "field.exclude.list": "db_devicetype:__deleted",
    }
}

Any inserts or updates done on the source db are reflected on the target db. However, the delete operations don't delete data from target db. I have tried all possible values for transforms.Unwrap.delete.tombstone.handling.mode. Note that this happens when I am using event flattening (ExtractNewRecordState). The delete works fine if I don't use event flattening.

Is there anything I am doing wrong? Or is this not supported yet? Or is this a bug?

sm003ash commented 5 hours ago

Anyone interested in looking into this?