hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Replace strategy setting BsonId #118

Closed lazaromedina closed 4 years ago

lazaromedina commented 4 years ago

Hi, on version 1.3.1, is it possible to use key provided in value with writemodel replace but inserting key as Bson, not a string?

{
…
"mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy", —> replace content if “_id” exists in mongo
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy" —> pickup “_id” message
…
}

actual behaviour:

event 0: kafka message 0:

{"_id": "5e56539be1863fe90d324877", “otherkey”: “value”}

mongodb document is:

{
    "_id" : "5e56539be1863fe90d324877”, —> STRING
    “otherkey”: “value”
}

event 1: kafka message 1:

{"_id": "5e56539be1863fe90d324877", “otherkey”: “newValue”}

mongodb document:

{
    "_id" : "5e56539be1863fe90d324877”, —>  STRING
    “otherkey”: “newValue”
}

desired behaviour:

event 0: kafka message 0:

{"_id": "5e56539be1863fe90d324877", “otherkey”: “value”}

mongodb document:

{
    "_id" : ObjectId(“5e56539be1863fe90d324877”),  —> OBJECT
    “otherkey”: “value”
}

event 1: kafka message 1:

{"_id": "5e56539be1863fe90d324877", “otherkey”: “newValue”}

mongodb document:

{
    "_id" : ObjectId(“5e56539be1863fe90d324877”), —> OBJECT
    “otherkey”: “newValue”
}

thanks in advance best, Luis

hpgrahsl commented 4 years ago

Hi @lazaromedina

thx for reaching out and clearly explaining your use case with sample messages & documents. the question for me is the following though: if the _id field in the kafka message contains always an objectid compliant 24 chars hex string, why is it not in extended json serialization using $oid see https://docs.mongodb.com/manual/reference/mongodb-extended-json-v1/#oid ? then it would work as is.

since it's only the raw hex-string, there is currently no way to support this out-of-the-box. you have two options though to make it work without much effort, yet it involves writing some code.

a) write a custom SMT (single message transform) and add this SMT configuration to your connector config b) write a custom post-processor for the mongodb sink connector which is probably even easier than a) for your use case

I'm happy to receive a PR of course to integrate this "new" post-processor then into the project 😉 If you need any further information just let me know.

lazaromedina commented 4 years ago

Hi @hpgrahsl, Thanks so much for the good work and your commitment to this project. As of now, this issue has been solved managing it as a string. However, as you have already indicated for a more future definitive solution I would have to follow the option b. By then, It will be my pleasure to make a PR.

Again, really appreciated the support.

BR Luis

hpgrahsl commented 4 years ago

Great to hear you got it working for your needs! If you don't mind I'd be happy if you could close this issue then ... at least for now. Feel free to re-open anytime when you start with a definitive solutions for this.