thenativeweb / node-cqrs-eventdenormalizer

Node-cqrs-eventdenormalizer is a node.js module that implements the cqrs pattern. It can be very useful as eventdenormalizer component if you work with (d)ddd, cqrs, domain, host, etc.
http://cqrs.js.org/pages/eventdenormalizer.html
MIT License
38 stars 27 forks source link

Elasticseach "Too Many Requests" #73

Closed fabiomssilva closed 6 years ago

fabiomssilva commented 6 years ago

Hi,

This issue is related with issue #70

While replaying a large number of events to Elasticsearch I find that at certain point elasticsearch reply with: HTTP/1.1 429 Too Many Requests

Checking the issue I was able to find that all the 'POST' requests to elasticsearch are done at one point int time. I was able to count over 100 requests for every 10ms time period.

Checking the replay code I see that all the requests are done in:

cqrs-eventdenormalizer/lib/definitions/collection.js line 475-490

If we change this block to:

const concurrentOps = 5;
async.series([
  function (callback) {
    async.eachLimit(replVmsToDelete, concurrentOps, commit, callback); // changes
  },
  function (callback) {
    async.eachLimit(replVms, concurrentOps, commit, callback); // changes
  }
], function (err) {
  if (err) {
    debug(err);
  }
  self.replayingVms = {};
  self.replayingVmsToDelete = {};
  self.isReplaying = false;
  callback(err);
});

this will implement rate control on the requests. But would be good to be able to control the rate while instanciating the repository. (or any other nice place)

There is any possibility to implement this on the lib ?

Please let me know. Tnks --Fabio

adrai commented 6 years ago

hmmm.... not so easy

adrai commented 6 years ago

You can try to override the saveReplayingVms in userland first?

nanov commented 6 years ago

This defiantly sounds like a problem, it could be solved on driver side also ( some kind of batch ops mechanism is better anyway ).

I will inspect it deeper in the coming days as I don't have the time currently, a written test to test against would be in great help.

fabiomssilva commented 6 years ago

Was thinking a bit on the problem.

Question: Does it make sense to change it to something like:

... async.eachLimit(replVmsToDelete, 1, commit, callback); ... async.eachLimit(replVms, 1, commit, callback); ...

This will make the replay to take a bit longer (3-4 seconds more on my case)

Some benchmarking (with one event at a time, 5 at a time and uncontrolled. ): 1-Processed 21144 total events in 49.041s. Bye bye. 5-Processed 21144 total events in 45.281s. Bye bye. all same time (with errors) -Processed 21144 total events in 41.759s. Bye bye.

The reasoning behind this is: Because we send all the network requests all at the same time, network latency might make the events to arrive out of order in to the database servers.If we send one by one it will make sure they arrive in order. Does this make sense ?

--Fabio

nanov commented 6 years ago

I don't think the order is relevant as those are different VMs, this also means that the error occurs when there are a lot of VMs, and not events ( even thought VMs are resulted from events it is important to point it out ).

Btw, you could try playing with the "refresh" option in the elasticsearch6 driver ( ie repository ) options, it is 'true" by default, maybe you can see what happens if it is set to refresh: 'wait_for', or the other way around if it is already true.

Again, best way to solve this would be with batch operations ( not only for ES btw. for all DBs that support such operations ) on the driver side, this way it will be way more effective and safe.

adrai commented 6 years ago

i.e. this for mongodb? https://docs.mongodb.com/manual/core/bulk-write-operations/

adrai commented 6 years ago

fyi: I've just extended the viewmodel module to be able to make a bulkCommit: https://github.com/adrai/node-viewmodel#some-implementations-support-bulkcommit inmemory and mongodb implementations are already done. Feel free to add the implementation for elasticsearch => for reference you can take a look here: https://github.com/adrai/node-viewmodel/blob/master/lib/databases/inmemory.js

With this change now cqrs-eventdenormalizer is able to call the more performant bulk function: https://github.com/adrai/node-cqrs-eventdenormalizer/commit/d853967de772c58ffeb125e1a64babd5b71edeb5

nanov commented 6 years ago

This is great, exactly what i had in mind!

I am on the ES implementation!

fabiomssilva commented 6 years ago

Hi,

You need any help ? I have a consistent way to make it happen. I can give a help in the tests. Or if you need help on the implementation side I can try to take a look.

-fS

adrai commented 6 years ago

elasticsearch6 implementation included in v1.14.3 thanks to @nanov

nanov commented 6 years ago

Still there is some work to be done in order to assure writes in case of a huge amount of vms and provide more accurate errors.

First step would be to write a test for 1000+ Vms bulkcommit and take it from there.

fabiomssilva commented 6 years ago

Hi Gents, :)

I try the change and I got into trouble.

I debugged a bit and it looks like when a bulk operation is made with a empty array of vms it fails in Mongo and in Elasticsearch. I added a check (see bellow --1--) for empty arrays and it looks like working now. Does my code make sense ?

After doing this all runs but I started getting "ConcurrencyError" in some events. I could use some advice on this one :)

There was also a typo on this function vm -> vms (check --2--)

--1-- async.series([ function (callback) {

    if (self.repository.bulkCommit) {
      if (replVmsToDelete.length > 0) {
        if (replVmsToDelete.length === 0) {
          callback()
          return;
        }
        bulkCommit(replVmsToDelete, callback);
        return;
      }
    }
    async.each(replVmsToDelete, commit, callback);
  },
  function (callback) {
    if (self.repository.bulkCommit) {
      if (replVms.length === 0) {
        callback()
        return;
      }
      bulkCommit(replVms, callback);
      return;
    }
    async.each(replVms, commit, callback);
  }
], function (err) {
  if (err) {
    debug(err);
  }
  self.replayingVms = {};
  self.replayingVmsToDelete = {};
  self.isReplaying = false;
  callback(err);
});

--2--

function bulkCommit (vms, callback) {
  self.repository.bulkCommit(prepareVmsForBulkCommit(vms), function (err) {
    if (err) {
      debug(err);
      debug(vms); // Typo was here. 
    }
    callback(err);
  });
}
adrai commented 6 years ago

try v1.14.4

fabiomssilva commented 6 years ago

Hi,

Tnks. We are checking :)

So far so good :)

Tnks --fS

nanov commented 6 years ago

@fabiomssilva, you have mentioned you get some ConcurrencyErrors, is that an issue with the implementation or the errors were right?

I did some testing during the weekend, and there are two things that i want to implement this week in order to make the driver more stable in high concurrency scenarios.

One would be to set a maximum bulk operation size, and when exceeded to split those into chunks.

The second would be to implement some sort of buffering mechanism for the normal event handling, this way the driver will be able to handle a huge amount of concurrent events. They way I thought this could be implemented is by settings a max capacity and a timeout, and then the operations will be executed ( in a bulk manner ), when then capacity is reached OR the timeout is exceeded.

fabiomssilva commented 6 years ago

Hi,

From what I can understand it is fixed with the latest patch. I dont see any problems anymore.

Tnks a lot. --fS