hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

CDC and ReplaceOneBusinessKeyStrategy #106

Closed mottish closed 4 years ago

mottish commented 4 years ago

Hi @hpgrahsl,

Firstly, thank you for all the support and effort you put in this great connector!

I have a question / issue with MongoDB Sink and Debezium MongoDB CDC. My source data is coming from MongoDB using Debezium CDC connector and I'm using your connector to write the data back to a different MongoDB RS. The issue is that one of collection has Index with unique ID based on Business key named "externalId".

I tried to configure the connector to use this key but it still complains on duplicate key entry.

My config: "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": true, "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector", "topics.regex": "myTopic.*", "mongodb.connection.uri": "mongodb://myDB:27017/DB?w=1&journal=true", "mongodb.ssl.enabled": true, "mongodb.change.data.capture.handler": "at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler", "mongodb.collections": "${topic}", "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy", "mongodb.key.projection.list": "externalId", "mongodb.key.projection.type": "whitelist", "mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy", "transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3"

The error I get:

[BulkWriteError{index=88, code=11000, message='E11000 duplicate key error collection: DB.collection index: externalId_1 dup key: { : \"6c02d106c24d015d2ffe73585160ef83\" }', details={ }}, BulkWriteError{index=422, code=11000, message='E11000 duplicate key error collection: DB.collection index: externalId_1 dup key: { : \"6c02d106c24d015d2ffe73585160ef83\" }', details={ }}].

Is the ReplaceOneBusinessKeyStrategy support CDC schema? Since the source data is based on Debezium schema, how can I tell the write model strategy to use my "externalId" field as the key?

hpgrahsl commented 4 years ago

Hi @mottish - short answer no this isn't possible at the moment, let me briefly explain why:

1) when you leverage the CDC mode it's typically because you want to react to the semantics of the CDC records, i.e. perform upserts or deletions accordingly. so this means depending on the actual CDC record there is one specific of the possible CDC write models already in place, which performs its operation against the sink. It doesn't make sense to mix these CDC write model semantics with other possible write models.

2) also when CDC is in place there is no post-processing pipeline that get's applied either. again, the typical use case is to take the records as is and perform the write model operations with 1:1, i.e. without modifying the payload along the way.

does that make sense to you?

mottish commented 4 years ago

Hi @hpgrahsl, It makes sense and CDC should handle it perfectly. In my case, due to high volume of updates written to many Kafka partitions, I can't guarantee the order of the messages and therefor I reach the point where updates are failing on inconsistency of the Object _id.

Do you have any advice on how can I overcome this besides working with single partition (which impact performance)?

hpgrahsl commented 4 years ago

Hi @mottish

I'm not sure if I get you correctly. So ordering guarantees are only there in case you provide a proper key for each record so that they land in the same topic partition. If you mean that the key used to produce records to the kafka topics is e.g. the object id in _id and you want to have ordering based on some other unique payload field then it simply won't work like that.

IMHO the only reasonable way to deal with this is to make sure that your records are written to kafka topics with the correct key in the first place, which then also allows you to have any number of partitions that would make sense for you. That can be done with Kafka connect single message transforms and concretely ValueToKey is your friend here:

https://docs.confluent.io/current/connect/transforms/valuetokey.html#valuetokey

Let me know if this was what you were looking for and whether or not it helps to solve your problem.

If not, can you explain further please?

mottish commented 4 years ago

I'll try to explain my issue beter. assuming I have one collection named "myCol" and it is being produced to Kafka using Debezium MongoDB connector with JSON schema. Each document has it own "_id" and a business key named "myId" where "myId" is uniqueu.

I consume the Debezium messages from Kafka to a different MongoDB RS using @hpgrahsl MongoDB Sink connector.

If a DELETE document operation is sent to partition 1 and CREATE document operation is sent to topic 2 with the same business key (myId) but different _id (since it has first been deleted) and the Sink connector try to run the CREATE operation before the DELETE operation it will fail on uniqueu key constrain on the destination MongoDB since the business key myId still exists.

so IMHO, either I use 1 partition to retain ordering of messages or use produce by Key (if Debezium support it or Kafka Connect). this way ordering will preserved.

I hope that clears my case.

hpgrahsl commented 4 years ago

hi @mottish yes then I understood you correctly and my answer above is the way to go. try to make sure that by means of a proper SMT your records are written to the kafka topic with the business key "myId". if an existing SMT doesn't work you can write a custom one which might be needed for your use case. I'll close this issue since the actions to be taken are outside of the control of the sink connector itself.

mottish commented 4 years ago

Appreciate your help! than you.

hpgrahsl commented 4 years ago

you are welcome. good luck with your efforts!

mottish commented 4 years ago

Hi @hpgrahsl, unfortunately I'm still stuck with this issue. I've tried your proposed solution of using SMT ValueToKey but since I'm using Debezium as source connector to send source MongoDB CDC to Kafka, I need to first use io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope SMT converter in order to read the message. doing so eventually does use my Key to send messages to Kafka but it doesn't preserves Debezium CDC schema and so it breaks the chain...

Before changing Key to my own "externalId": {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"}],"optional":false,"name":"dbserver5.inventory.customers.Key"},"payload":{"id":"2056"}} {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"ns"},{"type":"int32","optional":false,"field":"sec"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"},{"type":"boolean","optional":true,"default":false,"field":"initsync"}],"optional":false,"name":"io.debezium.connector.mongo.Source","version":1,"field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver5.inventory.customers.Envelope"},"payload":{"after":"{\"_id\" : {\"$numberLong\" : \"2056\"},\"first_name\" : \"Ann56\",\"externalId\" : \"ext260\",\"email\" : \"annek26@noanswer.org\"}","patch":null,"source":{"version":"0.9.5.Final","connector":"mongodb","name":"dbserver5","rs":"rs0","ns":"inventory.customers","sec":1572784855,"ord":1,"h":3453804412024348345,"initsync":false},"op":"c","ts_ms":1572784855589}}

After using unwrap SMT and ValueToKey with my "externalId" changes the schema: {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"externalId"}],"optional":false},"payload":{"externalId":"ext350"}} {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"first_name"},{"type":"string","optional":true,"field":"externalId"},{"type":"string","optional":true,"field":"email"},{"type":"int32","optional":true,"field":"id"}],"optional":false,"name":"dbserver5.inventory.customers"},"payload":{"first_name":"Ann53","externalId":"ext350","email":"annek35@noanswer.org","id":2053}}

The config I used at the source: "transforms":"unwrap,createKey", "transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope", "transforms.unwrap.drop.tombstones":"false", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"externalId", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "include.schema.changes": "false"

I kinda desperate here and I appologies in advance that I'm reaching out to you with this issue but you have a lot of knowledge working with Debezium so I'm trying my luck here as well.

Thanks!

mottish commented 4 years ago

I would like to update my issue, I believe it will be of interest by others as well. I thought of writing our own SMT which will change the Key without breaking Debezium schema and without using the UnwrapFromMongoDbEnvelope SMT. This will work for all operations besides "delete" which has only the document _id key and doesn't have the "after" field where we can find our own field for Key (externalId in my case).

Without maintaing order of deletes in the same topic of other operations I will be at the same spot.

Hence, the only valid solutions I have are:

  1. Change the document Id to be "externalId" and all the flow will work "out of the box" with @hpgrahsl Sink connector and Debezium source connector.
  2. Write my own SMT to change the Key without breaking the schema and use kStream to maintain maping of document _id to "externalId" and replace the key for delete operation.

option 2. is complicating the solution and also doesn't provide realtime ordering guarantee...

bottom line, it doesn't seems feasible to manipulate the Key using Debezium source or any other source as the delete operation has only the document _id as the key...

hpgrahsl commented 4 years ago

hi @mottish

you are right that the DBZ MongoDB source connector is a special case concerning delete operations. other DBZ source connectors expose a before field for delete operations which you could leverage to extract your other id field but here it's not possible unfortunately.

the way to go in this case is to write a kafkastreams app in which you maintain a mapping between the _id as contained in all DBZ source events -> and your desired id field. yes, this is a bit more involved than only applying SMTs and connector configuration but should be feasible without too much hassle. what I don't really understand though is this part: '...doesn't provide realtime ordering guarantees' - what does this even mean? can you explain further please?

gunnarmorling commented 4 years ago

Hi, Gunnar here from the Debezium team. Indeed I think that you are a bit out of luck when it comes to deletes, which currently don't have any payload information. So you wouldn't have a way to send the change events for them using your business key. The one solution that would work is if you were to use your business key as the actual primary key in the MongoDB source collection. If you can't change the actual source collection accordingly, perhaps you can implement some logic which creates a duplicated "view" of the original collection that's keyed by "myId" and then you capture that collection? That's the one way I could see that'd work. Would also be interesting to know whether the change streams feature introduced in MongoDB 3.x would expose the deleted document's state. We're planning to support that in Debezium, too, as an alternative to the current oplog-tailing based solution. This might be another incentive to do that.

gunnarmorling commented 4 years ago

Ah, I forgot to comment on @hpgrahsl's KStreams suggestion. Unfortunately, I don't think it'll be a fully satisfying answer.

We blogged about a very similar use case recently over on the Debezium blog. It's using a statestore which allows you to amend incoming partial change events with the full record state. You could do the same to enrich MongoDB delete events with the latest document state (order is guaranteed based on document id).

But you'd still have the potential ordering issue in the DELETE/CREATE use case; any implementation which changes the message id from the original document id to that other id only after the fact will be prone to this, as the incoming events with the original ids may end up in different partitions. Maybe you can rule out re-use of the business key (at least for some blocking period) in your business logic to mitigate this issue?

gunnarmorling commented 4 years ago

Taking the state store idea further, what might work is if you indeed have your custom SMT which in case of a delete obtains the full document state from that state store. That way you could emit the change events with the right key from the beginning. Ordering of events would be as per the ordering of TX in the oplog.

mottish commented 4 years ago

Hi @hpgrahsl,

@gunnarmorling answered the question about the ordering:

But you'd still have the potential ordering issue in the DELETE/CREATE use case; any implementation which changes the message id from the original document id to that other id only after the fact will be prone to this, as the incoming events with the original ids may end up in different partitions.

Hi @gunnarmorling and thank you for the answer. If I understand the suggested solution written in the blogpost, kStream pipeline will read the topic DBZ is writing to and write to a new topic where my business key is the message key. in a multi partition topic I believe the ordering can't be guarantee as DBZ use the _id as the key.

It may be feasible but with a risk of race condition is to have kStream reading DBZ topic and SMT will change the key and for deletes operations will call the state store API to fetch the business key. In this solution, it is important that the kStream will have the _id -<-> business key mapping before the delete operations arrives to the connector and trying to apply the SMT.

gunnarmorling commented 4 years ago

Yes, exactly, the solution from the blog post won't address your ordering concern. What would work is this:

if you indeed have your custom SMT which in case of a delete obtains the full document state from that state store. That way you could emit the change events with the right key from the beginning. Ordering of events would be as per the ordering of TX in the oplog.

Then the messages will have the right key from the get-go and there's no risk of re-ordering. But at the price of having a statestore lookup right from within the DBZ connector (via the SMT).