mcollina / mqemitter-mongodb

MongoDB based MQEmitter
MIT License
18 stars 10 forks source link

fix: ensure packets are emitted in order #24 #26

Closed robertsLando closed 4 years ago

robertsLando commented 4 years ago

Fixes #24

robertsLando commented 4 years ago

@mcollina now test fails, as expected

robertsLando commented 4 years ago

@mcollina I have a question, If I call multiple emits, should messages be reported in order even if I don't wait for emit callback before each emit?

mcollina commented 4 years ago

I have a question, If I call multiple emits, should messages be reported in order even if I don't wait for emit callback before each emit?

In theory yes, I would expect such a behavior.

robertsLando commented 4 years ago

@mcollina Ok so the test was good in the previous format, if I wait for callbacks it works but if not it doesn't, and that is the case of aedes, it is emitting callbacks in queue order but sometimes it emits one before another and that's why aedes is missing some packets, because when this happens the packet is deduped and discarded. I think the leak may be not in the code we pointed out.

Before calling a callback we should ensure that all previous callbacks have been called

mcollina commented 4 years ago

Ok so the test was good in the previous format, if I wait for callbacks it works but if not it doesn't, and that is the case of aedes, it is emitting callbacks in queue order but sometimes it emits one before another and that's why aedes is missing some packets, because when this happens the packet is deduped and discarded. I think the leak may be not in the code we pointed out.

I do not understand how this could happen.

robertsLando commented 4 years ago

@mcollina Check tests now, I think the problem is here: https://github.com/mcollina/mqemitter-mongodb/blob/master/mqemitter-mongodb.js#L137

Here we emit packets everytime we receive a packet from the stream, but that stream outputs packet when they are successfully stored in mongodb but not in the order insertOne is called

Also I think that the test I have added should be added to abstract tests of mqemitter

robertsLando commented 4 years ago

I have opened a PR on mqemitter: https://github.com/mcollina/mqemitter/pull/34

robertsLando commented 4 years ago

@mcollina Maybe this approach could be performance killer (or maybe not) but with this fix the leak problem is fixed!

robertsLando commented 4 years ago

The only problem I have found here is that when the 'leak' happens I have to immediatly call cb here:

https://github.com/mcollina/mqemitter-mongodb/pull/26/files#diff-614d4886bb4d8e8781639ee81936918cR146

Do you have any suggestion? Without that the process cannot continue but with this approach the cb passed to the emitted event is useless

robertsLando commented 4 years ago

I would recommend to use fastq instead.

I dunno how that could be used here, how can I do a find in that queue? Also isn't that queue used for functions? I'm using it to store packets

robertsLando commented 4 years ago

Just thinking that this may not work in a cluster env and could still have some leaks, let me check if it can be fixed using writeConcern

robertsLando commented 4 years ago

I have tried to use

this._collection.insertOne(obj, { w: 0, j: false, wtimeout: 100 }, function (err, res) {

without success, on high loads it still happens that two consecutive inserts are not stored in consecutive order (and emitted in order in the stream)

So ATM this fixes the problem but it will persist when using it in clusters environments

mcollina commented 4 years ago

Have you tried using w: 1?

robertsLando commented 4 years ago

@mcollina That's the default value. But even with/without that nothing changes. Sincerly I don't know if there is a solution to this (in clusters)

I cannot find nothing about ways to ensure that documents are written in the same order the insert is called

mcollina commented 4 years ago

I don't think there is a problem in cluster because the dedupe logic in aedes is per Node.js single process.

robertsLando commented 4 years ago

@mcollina You right, missed this :confounded: Ok so let me check why it's failing now

robertsLando commented 4 years ago

@mcollina Now should be ready, I have also fixed the callback logic now :tada:

robertsLando commented 4 years ago

@mcollina Would you mind also to add a new version of mqemitter so I can update it here and remove the test?

robertsLando commented 4 years ago

I'm thinking again to https://github.com/mcollina/mqemitter-mongodb/pull/26#issuecomment-684829044

We have 2 aedes instances:

  1. aedes1-mq1
  2. aedes2-mq2

The bug in this case is still present as different instances may receive packets in a different order. (This also explains why tests were flaky in aedes-tests with mongodb)

PS: I have already tested this with aedes-test clusters and it works, sure better then before but still not perfect

robertsLando commented 4 years ago

The only solution to this could be a shared queue, so another collection that keeps track of the emitted packet order, this would slow down us a lot as the read/writes in mongo will be double

robertsLando commented 4 years ago

@mcollina ping

robertsLando commented 4 years ago

@mcollina done. Anyway as I explain in https://github.com/mcollina/mqemitter-mongodb/pull/26#issuecomment-684933211 I think the bug will persist in a cluster env

robertsLando commented 4 years ago

@mcollina I have found the final solution :heart_eyes: , completely refactored all previous changes I have now used orderedBulkOperations to ensure the write order, performances are much better now and also this works in clusters :tada: :rocket:

robertsLando commented 4 years ago

Feel free to merge and release Matteo I'm done here 😊