looplab / eventhorizon

Event Sourcing for Go!
Apache License 2.0
1.6k stars 195 forks source link

Save & publish event atomically #254

Closed yagotome closed 2 years ago

yagotome commented 4 years ago

What happens if a failure occurs between the time an event is saved to the event store and that event is published to the bus? According to this source code, it seems these operations aren't atomic, are they?

https://github.com/looplab/eventhorizon/blob/8ae04a04ec8f95a7db77d28a0fab5b563a7e2dc3/aggregatestore/events/aggregatestore.go#L142-L157

nirweiner2 commented 4 years ago

Hi. I am interested in a solution as well. I thought about multiple options to solve this one:

  1. Using a Debezium-like solution: Polling the store for changes (Inserts / Updates / Deletions) and produce matching events.
  2. Apply the outbox pattern: Additionally to storing the events in the event store, insert extra rows to a custom table that indicates what events were just added. Then, poll the extra data and send matching events.
  3. Use the same storage for store operations and for the eventbus. geteventstore.com can be used for this although it can be done with any other database as well.

What do you think

toesterdahl commented 4 years ago

Where this is a real issue, not every application is the same, an option could be to use a persistent reliable ordered store such as 'etcd' or Kafka. This store would implement both the eventstore and the eventbus. The store would need to implement Save and Load; Publish could be a void method.

nirweiner2 commented 4 years ago
  1. Doesn't the problem applies to all applications? Could you elaborate with an example of an application which doesn't care about it?
  2. It is not trivial to use Kafka as an eventstore to solve it for various reasons which described well here https://serialized.io/blog/apache-kafka-is-not-for-event-sourcing/
  3. If one is interested in using different tools for eventstore and eventbus it doesn't get an appropriate solution to do that.

I personally want to use this library with Kafka and MongoDB. The problem mentioned in this ticket prevents me from doing so.

I believe many other encounters this problem as well.

giautm commented 4 years ago

The right solution is publish events from database. It mean, using other tools like kafka connect to pull changes from eventstore and publish to kafka. Or using other application that act like a database slave to sync to kafka using binlog (mysql only).

maarek commented 4 years ago

This issue is also related to #248 where the outbox pattern was designed for atomic writes and the event stream off of the outbox.

If you use EventStore, then it has a mechanism for idempotent reads that works really well, this is used in Equinox (F#) by Jet/Walmart. MongoDB as your event store has a change stream that you could read your projections/pm/sagas around as well.

I would recommend Kafka not as an event store but if you want to handle distribution of your events off of the change feed such that EventHorizon -> MongoDB -> Change Feed Consumer -> Kafka or something similar.

maxekman commented 4 years ago

Really interesting discussion! I will do some reading about the MongoDB change stream, maybe that can be integrated in the event store as a "write confirmation", if the MongoDB guarantees can't be configured strict enough to make sure the event is written before publishing.

maarek commented 4 years ago

@maxekman It's not the guarantee that it was written but that the publisher has not lost messages I believe. Due to the nature having two independent systems, without 2PC, we can guarantee that only the first occurs but not the second. If writing to the database but then crashing before writing to the eventbus, then we've lost events. If we've written to the event bus but the service drops before end consumers have a chance to process the messages, then we've lost events. We want to guarantee that both a write to the database occurs as well as all consumers have had a chance to listen and process the events at least once for 0 data loss and that they have the ability to pick up where they left off when the system restarts.

maxekman commented 4 years ago

Ah, I see! And a pub/sub like the one on GCP with message acking will not be able to provide that? But I suppose you are more concerned with the publishing phase?

maarek commented 4 years ago

What happens when you publish to your DB but you are not able to reach the pub/sub? Do you back out of your database transaction? Now what happens when you try to back out of the database commit and the DB is not accessible? It's just like you're always chasing your tail to get the guarantees you want and this is the difficulty of 2 phase commits.

Having a database that handles the atomic guarantees at the DB layer means that your application only has to manage the guarantees of the chosen data store. Your DBs change feed if provided should be query-able from where the consumer left off so that your consumers does not lose messages in case of failure.

This is all about ensuring that the downstream consumers always receive the messages at least once and they're never lost to the void as that can be super hard to debug and figure out post facto.

PS. If your application can live without this guarantee, maybe your events are not so important, then this all isn't super important. But say that you're a financial company and your downstream operational systems need to know that a client deposited money in their account. Then that deposit event cannot be lost at the bus or you start increasing the complexity of other parts of the system to rectify errors.

giautm commented 4 years ago

I think this article will helpful in discussion: https://www.nginx.com/blog/event-driven-data-management-microservices/#Achieving-Atomicity

The following sections:

maxekman commented 4 years ago

Great input @maarek and @giautm! Let’s see if we can make this happen. Maybe we should try to merge the event store and bus interfaces to enforce drivers to implement this? What are your thoughts regarding an actual implementation?

maarek commented 4 years ago

This is going to be a little long and I apologize in advance.

@giautm The article essentially outlined what we've said so far. The two best approaches are the outbox pattern where atomic transactions between the outbox and the store are adhered to, or the transaction log method, relying on the mechanism of the database itself to handle reading the event. Each have their pros and cons and depending on the data store, varying levels of complexity.

@maxekman This depends on the approach taken. Prior art, such as Equinox, split off the projection/reaction mechanism from the event handlers in what they call propulsions. The idea being that the event handlers sole purpose is to manage the event store and invariant checks. This is the same as as Kafka Connectors where they Source -> Sink processors for Kafka topics to other stores. Decoupling the two allows you to scale each independently. If you couple the interfaces, so that we guarantee that the implementation is a matching set, does it also limit the flexibility in application?

Some more thoughts:

Implementation of the outbox pattern, and our ability to manage it, is determined by the complexity of managing another model in the chosen database and to easily read and mutate that model without conflict or race. That data store would require transaction capabilities against multiple tables or documents. We would need to consider the approach to polling such a system. Would there only be allowed 1 system to poll the outbox and publish to the bus due to needing to mark off the message as read? Does the bus then require durability to ensure that it does not lose messages and that multiple consumers can process the same message from there? What is the process that occurs when a new process manager is brought online? Does it only receive events from the end of the bus? There would be coordination of listening to the bus, then pulling the latest event stream and rectifying overlap.

With the transaction log method, we find that this is very specific to the store being used. What works for MongoDB, would be a different process all together for PostgreSQL. These stores don't often allow you to attach listeners to the beginning and listen for all events to current time. MongoDB has a finite period of time the change stream contains the events but it can be queries from where the consumer left off if the consumer drops for a short period of time. MongoDB in particular would allow for multiple consumers to consume, but there is no mechanism that would support keeping multiple consumers from processing the same event as in the outbox pattern.

My typical approach for either option would be a consumer from MongoDB to Kafka where events can be persisted for a configurable amount of time and multiple consumers can attach themselves from there. But this is another system in the middle and added complexity.

Lastly, we have data stores that are specific to Event Sourcing such as EventStore. These have the facilities for querying and reading back a stream often from any point of time in that stream. I started to implement a similar data store in Go called maarek/Aves but really haven't had time to work on it and it's more of a PoC to dig into the implementation of Event Sourcing specific datastore.

nirweiner2 commented 4 years ago

@maarek I want to suggest a different approach. On a system that uses events for communication, it is quite likely that it consumes events to run its commands. Assuming the system uses an at-least-once mechanism. In this case, if crashing occurs right after writing to the eventstore, the command would be executed again. If we save the command's ID beside the events which were created by this command, It is possible to query for this information and reproduce those events again.

maxekman commented 4 years ago

I have authored an approach to this by wrapping storing the events and publishing them in a DB transaction. Any feedback on #268 would be great!

maxekman commented 4 years ago

I'll pivot #268 to implement the outbox pattern using a separate outbox collection in MongoDB with a consumer listening to the change stream and handling the events (typically with an event bus, will prototype with GCP pub/sub).

gedw99 commented 3 years ago

Interesting discussion...

@maxekman regarding the outbox pattern...

Genji is a db like mongo but 100% golang. https://github.com/genjidb/genji its build on top of badger and is fast and stable. https://github.com/dgraph-io/badger Its provides SQL API as well as the underlying badgerdb api.

i use it for cqrs and the materialised view produced from the cqrs. this means the Materialised view can send any changes out to the clients or other microservices. you can then make a SQL query that is also a subscription :)

maxekman commented 3 years ago

@gedw99 Thanks! Interesting project, will definitely read more about it!

gedw99 commented 3 years ago

Glad to see this project and discussion.

for atomicity I have been using badger dB and it’s subscribe feature as both a event store and a read only dB for materialised views.

Now that to genji I can use sql queries for the materialised views.

the code is used in production

benched. and can do 10,000 writes /sec whilst doing snapshot streaming backups every 10 seconds.

https://github.com/genjidb/genji

We also use it as a file store with chunked files.

basically it’s mongodb

It’s might be a good target for event horizon because it’s pure golang and can be embedded ( memory or durable ) or can be called over grpc.

we use it with benthos where benthos reads the events and updates the read database.. again running all in a single binary or with bistribured systems

Changes to the read only materialised views in genji are then broadcast using NATS with SSE, grpc and COAP /MQTT adapters for varying client types.

maxekman commented 3 years ago

Thanks again for a detailed write up! I’ll definitely come back to this first half of next year. I have two clients that needs the added robustness.

screwyprof commented 3 years ago

Hi guys. I've got a pet project I created to play around with CQRS/ES. It is pretty close to what you do. I haven't committed for awhile, but I guess it's still relevant. https://github.com/screwyprof/cqrs

The reason I'm mentioning it is that I was thinking over the very same thing you discuss. Building your own EventStore which can be used in reliable system is not a simple thing. When I was looking into the problem I stumbled upon a project which addresses all the issues discussed above: https://www.eventstore.com.

Update: Looks like the site was updated and the new version was released. The Golang client can be found here https://github.com/EventStore/EventStore-Client-Go

PS: Their blog has some related articles https://www.eventstore.com/blog/articles

maxekman commented 3 years ago

Hi @screwyprof! Cool that you experiment.

Me and many other are aware of Eventstore DB, and it might be a good solution for some. It’s however a full system that you run similar to a traditional DB system, compared to Event Horizon which is a library to be used within other apps. It might be necessary to have a Event Horizon service at some point (thinking about the outbox here) but I would like to avoid it as long as possible. Hope my thoughts gives some context.

screwyprof commented 3 years ago

Hi @maxekman

What I meant is that the library could use EventStore as a storage and a bus. In that perspective it's not way too different from using a database + a queue :) However it could break the existing API contracts...

maxekman commented 3 years ago

Ah, now I understand 🙂 Yes that is a possibility. I will not implement it until there is a use case however. If you try it out let me know! I'm interested to hear about the experience!

nirweiner2 commented 3 years ago

@maxekman @screwyprof Actually, I have suggested this approach as the third option on my first comment: https://github.com/looplab/eventhorizon/issues/254#issuecomment-643676835

Unfortunately I want to remind the big drawback of this approach - it is not generic. We won't be able to adopt it in case the message bus and the message store uses different solution.

maxekman commented 3 years ago

Yes, exactly. I would like a solution that can stand on its own legs. I have some ideas which I’ll experiment with the coming weeks.

gedw99 commented 3 years ago

Why not us NATS ? It’s 100% self host and is HA that’s to the new NATS Jetstream using RAFT.

So then any event store of dB of anything just needs to publish a change event ( protobuf or Jain or basically any form of encoding ) out to nats and it guaranteed to get to any subscriber.

you can run nats embedded in any existing golang binary and it wil make sure the message goes up to the nats jetstream cluster.

nats to nats can use pure tcp and is screaming fast

you can even run it on a mobile

maxekman commented 3 years ago

I'm experimenting. 🙂

https://github.com/looplab/eventhorizon/pull/291

screwyprof commented 3 years ago

Hi @maxekman.

I've stumbled upon https://github.com/ThreeDotsLabs/watermill which can be useful. It can be used to do what you want. Here's an example: https://github.com/ThreeDotsLabs/watermill/tree/d7c8a3e553c34bcc0b4b20bfd2e05909a3865f36/_examples/real-world-examples/transactional-events

maxekman commented 3 years ago

Thanks for the info, looks like an interesting project. A Watermill event bus could definitely be interesting!

It doesn’t however, from what I can see, solve atomicity in our context (storing and publishing).

I have been experimenting with the same pattern, publishing within a transaction. That however has the problem of possibly having published the event and then failing to save it. Which is equally incorrect as saving and failing to publish. Both saving and publishing must be guaranteed, and possibly retried.

screwyprof commented 3 years ago

@maxekman I guess you're a little bit wrong here. They basically use the same idea as with Outbox pattern. They commit into an sql-backed event store, then the consumer reads the events and publishes to Kafka. So in this case there's no 2PC problem. In other words this action of storing/publish can be considered atomic in this context.

maxekman commented 3 years ago

Ah, yes, you are correct. They are in fact using an outbox pattern. Nice example.

I have intended to create an outbox structure coupled to a specific event store where the bus can be hooked on to (similar to the example).

I'll probably work on this soon, but I have a few other things I need to solve first (in client projects using EH).

maxekman commented 2 years ago

There is now an almost finished PR with an outbox implementation. Any feedback is welcome! 😄

maxekman commented 2 years ago

I think merging of #335 solves this issue, will close for now.