hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Generating unwanted key for Delete #141

Closed nandorsilva closed 5 months ago

nandorsilva commented 6 months ago

I have a connector syncing with mongo, for inclusion and update it generates the correct key with the key of the kafka message, however when it is to delete the record in mongodb it generates the key with the duplicate id.

'{"delete": "Produtos", "ordered": true, "deletes": [{"q": {"_id": {"_id": 28}}, "limit": 1}]}

Follow my connector and log settings

{
    "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",
    "tasks.max" : "1",
    "connection.uri": "mongodb://mongo-connect:27017",
    "database": "db"  ,
    "collection": "Produtos" ,  
    "topics": "produtos",
    "max.num.retries": 1, 
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false ,
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
    "document.id.strategy.partial.key.projection.list":"_id",
    "document.id.strategy.partial.key.projection.type":"AllowList",
    "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
    "delete.on.null.values": true,  
    "mongodb.delete.on.null.values": true,
    "document.id.strategy.overwrite.existing":true 

}

Kafka message has this format

//Update ou Insert
{"_id":28}      {"_id":28,"Id":28,"Nome":"Lapis novo","Descricao":"dededwdwddwedewe88weo","Valor":1.99,"Quantidade":2,"ValorTotal":3.98}

//Delete
{"_id":28}      null

Log

Sending command '{"delete": "Produtos", "ordered": true, "deletes": [{"q": {"_id": {"_id": 28}}, "limit": 1}]}' with request id 667 to database db on connection [connectionId{localValue:35, serverValue:138}] to server mongo-connect:27017 (org.mongodb.driver.protocol.command:56)

hpgrahsl commented 6 months ago

Hi @nandorsilva

THX for reaching out with your question. After integrating this code base into the official MongoDB connector for Apache Kafka several year ago, this repo is basically in maintenance mode ever since then. According to your configuration "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector" you're using builds of the official connector which is why I'd recommend to bring your question directly into the MongoDB community forum. This is the right place to discuss things like this https://www.mongodb.com/community/forums/tags/c/data/connectors-integrations/48/kafka-connector

Thanks for your understanding and all the best for your kafka connect project!