hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Data loss or inconsistent if multiple operations of the same document are in the same batch #98

Closed victorgp closed 5 years ago

victorgp commented 5 years ago

This connector uses data unordered (https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkTask.java#L54) for bulk operations for better performance but this will led to inconsistent results and/or data loss when combining with a batch size (mongodb.max.batch.size property) higher than 1.

A bulk operation with ordered set to false doesn't assure that the documents in the batch will be applied in order, so in the case of multiple operations on the same document, if they fall in the same batch, the can be applied in the wrong order with the consequence of inconsistent data or data loss.

Let see an example:

In topic A contains messages from a MongoDB collection, in this collection there is a document with ID 123 that is in partition 5 of topic A. All the operations of document with ID 123 go to the same partition, so the right order is preserved for the consumption. We start consuming the topic with this connector and we set batch mongodb.max.batch.size=100. It turns out that the document ID 123 got this set of operations:

insert({_id:123, 'foo':'bar'})
update({_id:123, 'foo':'bar2'})
update({_id:123, 'foo':'bar3'})

We are unlucky and they fall in the same batch, the batch is written in a bulk operation and the order of the operations are received by MongoDB in the following order:

update({_id:123, 'foo':'bar3'})
update({_id:123, 'foo':'bar2'})
insert({_id:123, 'foo':'bar'})

As you can imagine, the result is not what we were expecting.

Solution Set ordered=true, performance is sacrified but we gain full consistency. I believe (pending to do some tests) that this is better than not doing bulk operations at all.

hpgrahsl commented 5 years ago

@victorgp thx for reporting this. 1) it is a known issue and will be addressed in the next patch release i.e. 1.3.2 and 2) it is already fixed in the official connector which is the recommended alternative anyway. so take a look at this repo as well in case you haven't already https://github.com/mongodb/mongo-kafka/ the reason why it has been done with unorderd bulkwrites is that it started originally for insert driven workloads only where it is not really an issue.

hpgrahsl commented 5 years ago

resolved by #99 THX @victorgp!

victorgp commented 5 years ago

Thanks!

I was aware of the MongoDB work but i didn't know they already released it. I will use that one