snatalenko / node-cqrs

CQRS backbone with event sourcing for Node.js
https://www.node-cqrs.org
MIT License
74 stars 22 forks source link

Concurrency handling #2

Open TomKaltz opened 6 years ago

TomKaltz commented 6 years ago

If command handlers from two separate processes try to store events with the same aggregate version is there any retry mechanism for the losing command?

snatalenko commented 6 years ago

No, since it's not the only possible approach. Currently we are using it in a system that allows simultaneous writes to different nodes. So, events with same aggregate ID/Version are possible and allowed, while the conflict resolution (if that's actually a conflicting change) is handled within an aggregate and related projections.

If you need such mechanism, I would recommend to create your own AggregateCommandHandler and add that logic to the execute(c: TCmmand) method. Then, if you are going to use the DI container, you can register your handler this way:

this.registerCommandHandler(container => new CustomAggregateCommandHandler({
    eventStore: container.eventStore,
    aggregateType: options => container.createInstance(MyAggregate, options),
    handles: MyAggregate.handles
}));
TomKaltz commented 6 years ago

Should there be an optional optimistic locking mechanism so that command handlers can get a better guarantee that they aren’t writing events based on inconsistent/contended aggregate state? Also for the projection side it seems that events coming out of order could provide indeterminate state as well. I’m a CQRS/ES beginner so I apologize if I misunderstand how it all works. Thanks for your insight!

snatalenko commented 6 years ago

It depends on your use case, event types, and type of a store you are using. In the environment with multiple masters, a conflict resolution should happen after the events are committed to the storage and those events are not always conflicting.

Such, user profile modifications on different nodes are unlikely, but user profile modification can happen at one node, while the same user can receive a connection invite at another - user aggregate can "understand" those events are not conflicting and accept/apply both of them. If the received events are conflicting, there should be a common algorithm that all of your nodes will use for a conflict resolution.

If you are using a backend with a single master (like MongoDB), you can add a unique index for the events.aggregateId and events.aggregateVersion pair and handle concurrency errors in the AggregateCommandHandler: if event commit fails (in case of MongoDB, err.code === 11000 or err.name === 'ConcurrencyError'), try to restore event stream and apply the command once again.