johanhaleby / occurrent

Unintrusive Event Sourcing Library for the JVM
https://occurrent.org
125 stars 16 forks source link

Support for "read your own writes" #84

Closed rickardoberg closed 3 years ago

rickardoberg commented 3 years ago

In order to build something like a REST API on top of event sourcing you need to have support for "read your own writes". The current Reactor API does not support this as the result of writing is Mono Void, whereas it should return something that can hold the transactional id of the write, so that the read can then wait for that transactional id to be seen by the read model subscriber.

Simplest fix would be to return Mono someattributebucketgoeshere, such that the client could get the id and then coordinate with read model subscriber, to not try to read the database until the events have been applied. I don't see how it would be feasible to create any REST API on top of Occurrent with reasonable semantics without this, unless I am missing something.

johanhaleby commented 3 years ago

Hi Rickard!

There are workarounds for this since you can add any (cloud event) extension you like to the events persisted to the event store (when you map your domain events to cloud events). So you could add your own transaction id when writing and then store that your projections. This is one thing I like with cloud events, they are extendable.

But regardless of this, I have been thinking of adding something like this to the API just like you imply, since it's so widely applicable. But I haven't really thought things through enough and I'd love to discuss what should be returned. My current thinking is that the new event stream version should be returned when calling write. This way you can return that from your REST API and you can query the projections to see if it's at the same version (or if you're OK with breaking a few ES rules, you could query the event store directly if you have the right indexes. Stream version and stream id are indexed by default so it should be efficient to fold over the event store up to a specific version using the EventStoreQueries API).

Thinking out loud, would it make sense to change Mono<Void> to something like Mono<TransactionDetails> where TransactionDetails allows you to get things like the new stream version?

Currently Occurrent stores very little metadata, only the stream version and stream id. One thing that I think is very important to store is something like a "transaction id". Maybe also sequence number(but I think that by using a transaction id combined with "stream version" will get you the same functionality?) as well as "correlationId". The reason why I've not added this already is that I'd like to use cloud event standards as much as possible. I need to google around a bit to see if there's a standard way to represent a "transaction id" in a cloud events, I haven't seen it yet. I think there are standards for distributed tracing and sequence numbers. But again, I'm not sure how much I should add by default and how much should be left to the user to add himself (at least in core).

Btw, thanks for the issue! :) It's one of the first and I'm really glad for your feedback. I know you have a lot of experience with ES/CQRS and I'm eager to hear your thoughts and ideas if you're willing to share them.

bartelink commented 3 years ago

Thinking out loud, would it make sense to change Mono<Void> to something like Mono<TransactionDetails> where TransactionDetails allows you to get things like the new stream version?

I arrived at this overload signature for the Transact function - you sometimes want to do a version check prior to processing, in addition to emitting the version afterwards. You can hang things off the context like session tokens too

johanhaleby commented 3 years ago

@bartelink Thanks a lot for the links! Definitely need to check it out.

you sometimes want to do a version check prior to processing, in addition to emitting the version afterwards.

Would you use it for something other than optimistic concurrency control?

bartelink commented 3 years ago

Yes, in the context of Equinox, the State is also in play - in some cases you want to project from the state including the events you just wrote.

But in the general case, depending on the store, you may want to equally expose a Version (for EventStore), or a SessionToken (for reading your writes in CosmosDB or Dynamo); the exact value depends on the store in question, but they'll have analogous purposes. If there was a need to expose the etag of the most recent version of the stream's Tip, it'd go here too (in Equinox that's not exposed, but is used internally in the caching mechanism)

Semi-related writeup regarding read-your writes techniques from Propulsion library

johanhaleby commented 3 years ago

I've taken an initial step to support this by adding a WriteResult that is returned when writing events (instead of void) to the event stores. WriteResult currently contains the stream id and stream version, but it could be extended to include other things in the future if needed (including perhaps database-specific metadata that could be of interest). One should be able to use the stream version to see if your projections are updated. I'm closing this issue for now, but if any of you have comments just let me know and we can re-open it :) For example with the reactive API:

Flux<CloudEvent> events = ...
Mono<WriteResult> result = eventStore.write("myStream", events);
var streamVersion = result.map(writeResult -> writeResult.getStreamVersion());
johanhaleby commented 3 years ago

@rickardoberg Published version 0.10.0 now that includes the changes described above.

rickardoberg commented 3 years ago

@johanhaleby Excellent!!