thenativeweb / node-cqrs-eventdenormalizer

Node-cqrs-eventdenormalizer is a node.js module that implements the cqrs pattern. It can be very useful as eventdenormalizer component if you work with (d)ddd, cqrs, domain, host, etc.
http://cqrs.js.org/pages/eventdenormalizer.html
MIT License
38 stars 27 forks source link

onEventMissing handling and callback #87

Closed OrH closed 4 years ago

OrH commented 4 years ago

Hey,

When handling missing events using onEventMissing, the callback is not called which can cause issues with message acking/rejecting, is this correct? If yes, is this by design or can be extended to call the callback after handling the missing events?

Thanks!

OrH commented 4 years ago

I think I might have found my answer - https://github.com/adrai/node-cqrs-eventdenormalizer/blob/master/lib/revisionGuard.js#L218 After handling with the events with revision lower than the revision in the event, it should find the current event that caused the onEventMissing to trigger, and continue with it, is that correct?

adrai commented 4 years ago

Yes, there is no callback for it... it's designed like that. But there should not be any bigger issues with message acking/rejecting...

you should do something like this: pseudo code:

denormalizer.onEventMissing(function (info, evt) {
  // i.e. ignore when already replaying the whole readmodel...
  if (isInReplayMode) return;

  // grab the missing events, depending from info values...
  const aggregateRevision = info.aggregateRevision;
  const guardRevision = info.guardRevision || 1;
  const aggregateId = info.aggregateId;
  const aggregate = info.aggregate;
  const context = info.context;
  const concatenatedId = context + aggregate + aggregateId;

  const missReq = missingRequests[concatenatedId];
  if (missReq && missReq <= aggregateRevision) {
    console.warn('Missing events for "' + context + ':' + aggregate + ':' + aggregateId + '" already requested ==> skip!');
    return;
  }

  // for the above check...
  missingRequests[concatenatedId] = aggregateRevision;

  // and call handle...
  denormalizer.handle(missingEvent, function (err) {
    if (err) { console.log(err); }
  });

  console.log('Start replaying missing events for aggregate "' + context + ':' + aggregate + ':' + aggregateId + '" from revision "' + guardRevision + '"');
});
OrH commented 4 years ago

Hey Adrai, thanks for the quick response! I'll try it 👍 I just didn't understand what is the missingRequests for, is this for if another event will arrive with a smaller revision the the one triggered the onEventMissing?

adrai commented 4 years ago

the main task of it is to detect if an event is missed for some reason... transport problem, denormalizer crashed during denormalization, etc...

OrH commented 4 years ago

Got it. Thanks @adrai !