thenativeweb / node-cqrs-eventdenormalizer

Node-cqrs-eventdenormalizer is a node.js module that implements the cqrs pattern. It can be very useful as eventdenormalizer component if you work with (d)ddd, cqrs, domain, host, etc.
http://cqrs.js.org/pages/eventdenormalizer.html
MIT License
38 stars 27 forks source link

ConcurrencyError on replay #78

Closed blissi closed 5 years ago

blissi commented 5 years ago

Hallo @adrai ,

my denormalizer-application tries to get the last processed event on startup and then all newer events and replays them.

The code is basically like this:

const lastEvent = await denorm.getLastEvent();
const lastEventTimeStamp = new Date(lastEvent.occurredAt);
const eventsSinceLastEvent = await denorm.getEventsSince(lastEventTimeStamp, 0, 1000);
const eventsToReplay = eventsSinceLastEvent.filter(ev => ev.payload.occurredAt > lastEventTimeStamp));
await denorm.replay(eventsToReplay);

Now sometimes the replay fails with a ConcurrencyError. I digged through the code and saw that the exception occurs in your ViewModel-library: it performs a conditional update - only if the _hash is different. For me it seems as if the event has already been written to the DB and therefore there is no hash-change.

Until now my application catches exceptions at startup, logs them and then quits. Because of this, the app always stops and never begins to run properly.

Now my question: is this something that is an expected situation that I should ignore? That is, catch the ConcurrencyError and simply proceed? Or do I have something wrong with my replay-code?

Unfortunately at the moment I don't have a clue what is causing this behavior. I had the denormalizer application running in my debugger, then performed some changes and restarted it. But I took care that no events were being processed when I restarted it, the app was idle.

Thanks for your help, Steven

adrai commented 5 years ago

Never had such an issue... perhaps there are gaps in the events you’re replaying? Would be nice to have a reproducable example...

nanov commented 5 years ago

A reproducible example would indeed come in great help.

Is your error thrown somewhere here ?

The _hash in the mongodb implementation is used for optimistic concurrency control, this means that the update will occur only if hash is the same.

Are you sure you are performing replay before accepting new events for handling?

blissi commented 5 years ago

@nanov The error is thrown in this line:

if ((result.modifiedCount + result.upsertedCount) < modifiedCount) return callback(new ConcurrencyError());

modifiedCount was around 20, but result.modifiedCount and result.upsertedCount were both 0. I'll try to to find out the steps that lead to this behavior.

blissi commented 5 years ago

@nanov I almost missed your last sentence (just stood up): That could indeed be the cause of the problem!

My startup code is this:

...
this.queues.shippingDenormalize.listen(ev => this.onEventReceivedOnBus(ev));
this.queues.contactDenormalize.listen(ev => this.onEventReceivedOnBus(ev));

this.denormalizer.onEventMissing((info, ev) => this.onEventMissing(info, ev));
this.denormalizer.onEvent(evt => this.onEvent(evt));
this.denormalizer.onNotification(noti => this.onNotification(noti));

await this.replayAllMissedEventsOnStartup();
...
nanov commented 5 years ago

You should debug what happens here.

More in detail to see whats the _hash in the viewmodel object against the actual hash in the db.

Your replay procedure should be indeed called before you start listening for new events, otherwise strange things might happen ( is your queue persistent? )

adrai commented 5 years ago

try to buffer the new events coming from the bus while replaying... as soon as the replay is finished handle the buffer and continue to listen normally for new events...

blissi commented 5 years ago

@nanov Yes, the queue is persistent. @adrai I see that I tried, but with a hole in the implementation:

private onEventReceivedOnBus(ev) {
        if (ev.event === cqrs.commandRejected) {
            this.handle(ev);
        }
        else if (this.isFullyInitialized) {
            this.handle(ev);
        }
        else {
            this.parkedEvents.push(ev);
        }
    }

-> That's the reason why it occurs rather seldom: I think only when a commandRejected-event has occured and I restart the denormalizer.

But given that the queues are persistent, I won't need to park the events at all, right?

nanov commented 5 years ago

Buffering can be handled on the queue level ( as in your case ). You should be good to go if you start consuming the queue after you are done with the replay.

EDIT

You will most probably get AlredyDenormalizedError afterwards if everything works as expected, as your events should be in the queue already. One solution would be to ignore those.

blissi commented 5 years ago

Ok, thank you both for your great suggestions!

dmitry-textmagic commented 4 years ago

Hello. We're ended up with almost the same process as @adrai suggested. However, I think that correct replays is impossible when we have the shared revisionStore for both active and warming up view collections. Here the use-case:

  1. We have 2 view collections in eventDenormalizer. They were started from the very beginning, so they handled all events.
  2. Next, we need to add a new (third) view collection, tied to this eventDenormalizer. After startup, it is obviously empty and needs to be warmed up by replaying old events.
  3. If we try to do it by replaying events first (using replayStreamed), then "done()" function (https://github.com/adrai/node-cqrs-eventdenormalizer/blob/master/lib/replayHandler.js#L243) will rewrite current revisions to the last replayed even revision.

But in the meantime (while we are replaying old events for the 3rd view collection), our eventDenormalizer will receive new real-time events, which could update them and update the current revision in revisionStore as well. But after calling "done()" function revision would be reset back to the old one (according to last replayed event). And 1st and 2nd view collection data become invalid – actually, we will need to replay them as well then.

The only possible solution we came up with is to make separate eventDenormalizers for the view collection. As long as each eventDenromalizer will have its own revisionStore, we will free to replay events and they would not touch any of the other view collections.

Could you please advise, is this correct behavior – to make a separate eventDenormalizer for every new view collection? Or there's just some breach in our logic? Your help would be greatly appreciated.

adrai commented 4 years ago

What you are suggesting is possible, but I sofar solved this in another way: If a collection needs to be replayed, I create a whole new parallel setup. New db or collections and new denormalizer process(es). so the old denormalizer process(es) can still continue to handle the live events while the other is replaying. As soon the replay is finished, desreoy the old process(es) and db or collections.