envato / event_sourcery

A library for building event sourced applications in Ruby
MIT License
84 stars 10 forks source link

Come up with approach for skipping side effects when an EventStreamProcessor is catching up #24

Open grassdog opened 8 years ago

grassdog commented 8 years ago

Bye bye clutch down mode. Let's come up with something else.

harukizaemon commented 7 years ago

At a minimum we:

In some cases (mostly due to simplicity/laziness/time constraints) we:

vonconrad commented 7 years ago

So I've been thinking a lot about this over the last couple of days, particularly as I've been following along the discussion in the Event Sourcery Slack channel. I thought I'd capture my thoughts.

1. Problem

We need a way to safely replay events through a Reactor without triggering side effects and/or emission of events back to the event stream. (For the purposes of this writeup, I'll use "event stream" to mean the single, global, append-only list of events in the system's event store.)

This problem does not apply to Projectors--these should be safe to replay as they currently are. If you can't rebuild a projection safely then there's likely something wrong.

2. Use cases

I thought it'd be helpful to describe the reasons we need a safe way to replay events through a Reactor without triggering side effects again. There are likely more reasons than those enumerated below, but I feel like these capture the main classes of reasons for why safe replaying is desired:

2.1. Unintentional loss of tracker/bookmark

Each Reactor keep a "bookmark" of where in the event stream they're at. In EventSourcery, the convention is to store these "bookmarks" in a tracking table in the projections database. If anything happens to this database, we need to be able to re-determine the position of the "bookmark" to know where the Reactor was at.

In other words, this is a disaster recovery situation. Ideally, this would never be something we need to do, but in case it does happen, we should design our bookmarking feature so that bookmarks are disposable.

2.2. Intentional reset of tracker/bookmark

Sometimes it's handy to be able to reset the "bookmark" and have the Reactor start from scratch again.

Particularly, this use case centres around being able to deal with evolving business logic in the Reactor. It's very likely (in fact, it's already happened in the Item Warehouse) that we need to make changes to a Reactor that involves re-processing the entire event stream but not trigger side effects.

Example: Let's say we have a Reactor responsible for emailing orders to customers. For this Reactor to do its job, it subscribes to the order_placed event type. But because the order_placed event wouldn't have the customer's email address, it would also need to subscribe to email-related events so it can build up a reference table (also known as "internal projection") of customer email addresses. It ignores all other events.

Now imagine we get a new requirement to include the customer's first name in the email. Neither the order_placed event or any of the email-related events have that information, so we need to subscribe to new event types that are related to customer names. We also need to add a column to our reference table for the customer's name. To populate this column, the Reactor needs to go back to the beginning of the event stream and process all the name-related events that it previously ignored.

In effect, the Reactor needs to go back to the beginning of time and re-process events without sending the order emails again.

This use case is an intentional action and manually triggered. It's not about disaster recovery, it's about having flexibility in the system to allow for satisfying emerging requirements without knowing them upfront.

2.3. Reactor → idempotence

In my opinion, Reactors must take its actions in the following order:

  1. Trigger side effect (e.g. send email, interact with 3rd party API, etc).
  2. Emit event back to event stream.
  3. Update bookmark.

To explain why this matters, it's important to remember that Reactors are designed to stop and retry on failure. This means that it will abort processing the event and shut down. When it comes back up again, it will attempt to re-process the same event again.

The triggering of a side effect is usually the action most prone to errors, so that should go first. Then, we have a choice, which effectively boils down to what would happen if the last action fails. If the last action is to emit the event and that fails, it means we'll have updated the bookmark and so the next time we run the reactor we'd move on to the next event in the event stream without having emitted an event. If the last action is to update the bookmark and that fails, then the next time we run the reactor we will trigger the side effect and emit the event again. It's effectively an at-least-once vs at-most-once situation, and I'll opt for at-least-once in most situations (including this one).

Assuming that these three actions cannot happen in a transaction (they can't), I do not know how to make Reactors fully idempotent. You may be smarter than me and have figured this out, in which case I'd 🙇 and would ❤️ to hear more!

Example: The Reactor that sends order emails could send the email, but when emitting the order_email_sent event back to the event store, something unexpected happens and an exception is raised. In this case, the Reactor would die and restart without having emitted the event or updated the bookmark. It would attempt to re-process the same event again, which means it would send the email again. Hence, the Reactor is not idempotent.

Another example: The same Reactor sends the email, emits the event, but then suffers from an unexpected failure in the form of an exception while attempting to update the bookmark. In this case, when the Reactor restarts it would now send the email and emit the event again. If the exception is raised again, we could potentially get stuck in an infinite loop where we attempt to process the same event over and over, sending N emails and emitting N events. The Reactor is not idempotent.

As I said, I don't think it's possible to make Reactors fully idempotent. But it is possible to approach idempotence without ever reaching it. If actions 1 and 2 above have happened--which means there's an event in the event stream noting that the side effect has been triggered--then the Reactor can still recover from not having updated its bookmark.

In other words: if the event is emitted, the Reactor can be made idempotent.

3. Solutions

Here are the solutions I've seen discussed, and how I interpret them (and their pros and cons):

3.1. Set a manual re-processing flag

A simple solution I've heard mention is to manually set a flag when starting a Reactor. This flag would prevent the Reactor from triggering side effects or emitting events while it's processing the event stream.

Pros

Cons

3.2. Use causation_id to find the last emitted event

This solution has been mentioned by @stevehodgkiss for some time. By setting an explicit causation_id for every event emitted by a Reactor (we already do this, but it's in the event body), a simple query to the event store would allow us very quickly find the last event that was emitted by a particular Reactor. The query would look something like this:

SELECT causation_id -- the last known event the reactor processed
FROM events
WHERE type IN ("...", "...") -- whatever event types the reactor emits
ORDER BY sequence DESC -- reverse order of events, in some implementations this is called `id`
LIMIT 1 -- we only need the last one

The Reactor would then be able to set its bookmark to its "last known position" as given by this query, effectively skipping a number of events in the event stream.

Pros

Cons

3.3. Hot standby/replica

I saw this mentioned in the Slack channel. From my understanding, it's effectively a DR solution that involves having a database replica of wherever the tracking/bookmark table is stored, so that if we lose the data we can quickly get it back.

I view this as a preventative measure and not a solution to the use cases above, but figured I'd include it anyway as it's been floated.

Pros

Cons

3.4. "Clutch"

Ahh, the famous Clutch. I've discussed this previously in many conversations, but if you're unfamiliar, I'll quickly recap. For a lengthier description, please see this book draft that some smart people have been working on. The relevant section starts on page 54, and the meaty bit is on page 58.

When a Reactor starts up in "clutch down mode," it's like pushing down the clutch on a car. You can push the gas pedal all you want, but you ain't moving nowhere. Hence the "clutch" metaphor. What it means in practical terms is this:

  1. The Reactor starts up in "clutch down mode."

  2. The Reactor processes the event stream as per normal, but instead of triggering side effects and emitting events, it pushes those events and side effects onto some form of list (usually a hash). Think of this as:

@list[event_to_be_emitted] = side_effect_to_be_triggered
  1. The Reactor continues processing the event stream, until it encounters an event of a type it emits can emit. It takes that event and sees whether it's in the list:
@list.key?(event_it_just_encountered_in_the_stream)

3.a. If it is, the Reactor removes it:

if @list.key?(event_it_just_encountered_in_the_stream)
  @list.delete(event_it_just_encountered_in_the_stream)
end
  1. The Reactor continues processing the stream, adding events it should emit to the list and then removing them as it encounters those events in the stream.

  2. Once it reaches the end of the stream, the Reactor checks to see what's left in the list. If there are events (and side effects) there, it means the Reactor should execute those but hasn't yet, and so it executes. This is also called "releasing the clutch."

@list.each do |event_to_be_emitted, side_effect_to_be_triggered|
  side_effect_to_be_triggered.call
  EventSink.sink(event_to_be_emitted)
end
  1. The Reactor continues processing the stream, with the clutch up (so nothing gets added to the list--events are emitted and side effects triggered straight away).

Pros

Cons

(My) Conclusion

I don't think it's any secret that I'm a fan of 3.4 (clutch). I think it's the most intuitive solution, particularly seeing how I believe Reactors should always be started in "clutch down mode." In doing so, you get automatic replay safety for every Reactor out of the box.

I can also see 3.2 (causation_id) as a possible one, but it would require a fair amount of messing around. At every start, the Reactor would need to figure out whether it's replaying or not, and set flags and behave accordingly.

3.4 (clutch), meanwhile, automatically assumes it's replaying every time it starts. Assuming we can solve the performance issues, that to me feels like the safer choice. I also think it's simpler, although the difficulty I seem to be having explaining it somewhat contradicts that belief. 😆

In any case, I'd love to hear more thoughts and ideas, as well as feedback. If I've misunderstood or mischaracterised something, please don't hesitate to let me know. 🙂

stevehodgkiss commented 7 years ago

Thanks for taking the time to write all that down @vonconrad! Some thoughts:

Warren, Odin and I had a chat about this today since they're working on this at the minute. We ended up settling an approach that resembles 3.2 because it offers a more efficient way to figure out the last actioned event ID for a given Reactor. The approach looks like something like this (correct me if I'm wrong @warrenseen @twe4ked):

I don't like the clutch approach for the following reasons:

In other words: if the event is emitted, the Reactor can be made idempotent.

Yep, and with 3.2 we could modify the above logic (where a Reactor starts up and has a tracker entry in the db), to verify that the last actioned ID is actually what the event store has on file, that or simply use that value... it might not even need to be stored in the tracker table if the query to the event store is indexed and fast, it could simply grab that value on startup, keep it in memory and work from the last processed event ID value that we have now.

mjward commented 7 years ago

Stella write up @vonconrad! 💯 I'm stoked we (you) have documented this with examples as it was a PIA to load all of this from memory every time the topic was raised.

My immediate thoughts are inline with @stevehodgkiss around solution 3.2. This maps closely to the solution that I had rattling around in my mind after discussions with Orien and Simon. Storing the name of the Reactor that caused the event is the last piece of the puzzle that crosses off the concern where multiple Reactors emit the same event.

I see this solution addressing use cases 2.1 & 2.2 you mention. Use case 2.3 is its own kettle of fish for which I've also struggled to think how to address.

The "clutch" is a clever metaphor I just prefer the simplicity of the last_actioned_event_id being restored on Reactor startup. Less moving parts.

Very happy to hear we are actively working on implementing a solution to this problem.

For additional context, the scenario where this occurs in Payables (different Reactors emit same event) is around marking a Payment as succeeded or failed. Payoneer payments happen synchronously where Paypal is asynchronous. Both of these are handled by a different Reactor.

vonconrad commented 7 years ago

Thanks for taking the time to write all that down @vonconrad!

Thanks Steve. 😊

The reactor is modified to not emit events unless it's past the last actioned event. So if last_processed/seen event ID is less than the last actioned event ID, it's essentially rebuilding the Reactors projection.

This should address use case 2.2, if I'm understanding it correctly.

[I don't like the clutch approach because] a deploy means that a reactor is not keeping up to date with the latest events in the stream that it needs to react to because it's started from 0 in clutch down mode going through x million events for a few hours until it reaches the end of the stream again.

I don't understand how you'd jump to the conclusion that a Reactor would start from 0. To my way of thinking, here's how it'd work in the normal case:

  1. The Reactor starts.
  2. The Reactor checks the tracking table and finds its bookmark.
  3. The Reactor asks the Event Store for the current_sequence.
  4. If bookmark < current_sequence, the Reactor starts processing events in "clutch down mode" until it reaches current_sequence, at which point it releases the clutch.
  5. The Reactor continues to process new events with the clutch up.

Because the Reactor is starting from bookmark in every case, the only way it'd go back to 0 is if the bookmark has been lost (use case 2.1) or if it's been manually reset (use case 2.2). That would effectively amount to the bookmark being set to 0, either intentionally or by accident/disaster.

In most cases, though, this should be really quick.

I agree that solution 3.2 would be faster for use case 2.1 but don't believe it would be significantly faster for use case 2.2 as you need to go through the entire event stream anyway.

If the Reactor has been unavailable for a significant amount of time (hours? days?) then it will take some time for it to catch up. However, that's going to be the case no matter which solution we choose.

Yep, and with 3.2 we could modify the above logic (where a Reactor starts up and has a tracker entry in the db), to verify that the last actioned ID is actually what the event store has on file, that or simply use that value... it might not even need to be stored in the tracker table if the query to the event store is indexed and fast, it could simply grab that value on startup, keep it in memory and work from the last processed event ID value that we have now.

Makes sense. :+1:

Stella write up @vonconrad! 💯 I'm stoked we (you) have documented this with examples as it was a PIA to load all of this from memory every time the topic was raised.

Thanks Matt! 😊

Storing the name of the Reactor that caused the event is the last piece of the puzzle that crosses off the concern where multiple Reactors emit the same event.

Yeah, this was the last piece of the puzzle for me too.

However, the more I think about this, the less I like it. Storing the name of the Reactor that emitted the event against the event so that it's queryable, feels icky to me. We're now starting to conflate the responsibilities of the event store with downstream code organisation concepts. By storing the origin of the event in the event store, it means that it is rapidly becoming aware of the downstream. To me, that doesn't feel like something the event store should be doing.

In general, I think this is a key reason why I prefer "the clutch" over 3.2. With "the clutch," the act of figuring out what a Reactor has and hasn't done is solely a Reactor concern. It's up to the Reactor itself to make itself safe to replay. The event store doesn't know, or care, how this is done--or that it even is being done.

With 3.2, the event store becomes a crucial collaborator in this process. That feels wrong and I instinctively dislike that quite a bit. Introducing several new first class concepts to the event store for the single purpose of optimising downstream processes is not what I consider reasonable. By introducing new columns on the events table and a new queryable interface solely to solve this problem, we're effectively coupling the implementation of our event store to the implementation of our downstream processes. That, to me, looks on the tin like a--gasp!--cyclic dependency. 😆

Personally, I prefer my concerns separated and my event store decoupled from my downstream subsystems. 😉 This is why "the clutch" appeals to me, because it puts the sole responsibility for solving the problem squarely where it belongs--in the Reactor itself.

stevehodgkiss commented 7 years ago

I don't understand how you'd jump to the conclusion that a Reactor would start from 0

That was my assumption based on how it used to work and conversations I'd had on the topic recently.

However, the more I think about this, the less I like it. Storing the name of the Reactor that emitted the event against the event so that it's queryable, feels icky to me. We're now starting to conflate the responsibilities of the event store with downstream code organisation concepts. By storing the origin of the event in the event store, it means that it is rapidly becoming aware of the downstream. To me, that doesn't feel like something the event store should be doing.

With 3.2, the event store becomes a crucial collaborator in this process. That feels wrong and I instinctively dislike that quite a bit. Introducing several new first class concepts to the event store for the single purpose of optimising downstream processes is not what I consider reasonable. By introducing new columns on the events table and a new queryable interface solely to solve this problem, we're effectively coupling the implementation of our event store to the implementation of our downstream processes. That, to me, looks on the tin like a--gasp!--cyclic dependency. 😆

The event store can store metadata about the event. For example, we've talked about adding things like the git_sha of the application revision that emitted the event. Does that mean we have a cyclic dependency between git and the event store? 😆 . The event store never reaches out to downstream systems, it simply provides the ability to store metadata. It doesn't know what any of that metadata means.

The specific implementation Warren, Odin and I talked about was adding a metadata jsonb column to the events table where this, correlation_id, git_sha and any other metadata about the event that the application cares about can be stored. Adding an index to make the "driven by" data queryable is a pragmatic optimisation for this use case, we could alternatively replay the whole event stream in clutch mode but look at the event metadata instead of the event body data to figure out the last actioned event, but I don't see why we would want to do that given we could get the response in < 1 second via a query vs the time it takes to walk millions of events in the stream (worst case). We could make this a projection but this requires an always up to date projection and in a DR scenario that won't be the case.

In general, I think this is a key reason why I prefer "the clutch" over 3.2. With "the clutch," the act of figuring out what a Reactor has and hasn't done is solely a Reactor concern. It's up to the Reactor itself to make itself safe to replay. The event store doesn't know, or care, how this is done--or that it even is being done.

The same is true if the event store stores the necessary values needed by the reactor as event metadata, vs storing that same data inside the event body hash. The reactor still needs to go to the event store and find the event it needs to start actioning from.

mjward commented 7 years ago

Hmmmmmmmm

System requirements do evolve over time, as does the design of a system. Who is to say that the Reactor emitting a side effect now will be the same Reactor forever? What happens when we split the responsibilities of Reactors and that logic that causes a side effect moves into a different Reactor (with a different name)? Or if we decide to simply rename a Reactor to give it a more descriptive name around the process it is performing.

It does feel rather brittle to bake that fact into the Event metadata and force Reactors to have knowledge of every Reactor that came before it that possibly, at one stage, emitted the same event type.

What happens when that side effect logic is not in the Reactor itself, but in a Service object (collaborator) used from a Reactor? The lines become less clear and keeping track of this could easily become an afterthought.

The more I think about this I start to see lots of places where this might accidentally trip you up. The absolute very last thing you want is to replay events and unknowingly trigger side effects as you won't know straight away unless something spectacularly blows up.

There is a very good possibility that your users or stakeholders may be the ones to bring it to your attention... this is not a good indicator we are in control of our systems and too be honest, would be very embarrassing.

warrenseen commented 7 years ago

I've made no secret of the fact that I'm interested in the DR aspects of this feature, and I have no philosophical horse in this race. Given that, I would tend towards the approach which allows me to reconstruct the current internal state of a reactor with the least amount of data.

@vonconrad @stevehodgkiss FWIW, we got to a partially workable implementation of 3.2 pretty quickly yesterday. It's not quite PR-ready, but next week @twe4ked and I can share that and we can have some more concrete discussions around what that implementation looks like. There are some practical concerns I too have, most obvious being "What if I want to rename this reactor?"

@mjward re:

What happens when that side effect logic is not in the Reactor itself, but in a Service object (collaborator) used from a Reactor? The lines become less clear and keeping track of this could easily become an afterthought.

My understanding is that you'd mitigate that by emitting an event to indicate that the side effect was actioned (i.e Figure 1.7.3 in the link to the book draft ^^^)

The final thing that I would say is that whilst I am all for eschewing "magic" in libaries/platforms/stacks, I do wonder if it's dangerous to push ALL the responsibility of making the reactor replayable back on the implementors without providing some guidance, be it through the reactor API or some other method, in HOW to make reactors that are compatible with whatever approach we settle on? 🤔

mjward commented 7 years ago

My understanding is that you'd mitigate that by emitting an event to indicate that the side effect was actioned (i.e Figure 1.7.3 in the link to the book draft ^^^)

Apologies @warrenseen, I could have been a little clearer, I was more referring to the considerations you would need to make when multiple Reactors emit the same type of event and the implications of using the reactor name to differentiate. I'd assume that you if you ever actioned a side effect you always want to be recording that by emitting an event.

stevehodgkiss commented 7 years ago

Just to be clear, the tracker table uses MyReactorOrProjector.processor_name (which is a manually configured string in each ESP) to associate tracking state with the ESP. The class can be renamed without causing any problems related to tracking. If the processor_name value changes though, it'll be like you're kicking off a new ESP and start from 0.

In my mind this would be the value stored in event metadata along with the correlation ID/driven by event ID. It would have the same implications that we have now with respect to changing that value (change it and it starts from event 1 actioning events). I agree @mjward, accidentally changing that value without knowing it's implications could be disastrous (just like it would be now).

mjward commented 7 years ago

Pull causation_id out of the event body so it's queryable (along with the reactor name that emitted it I believe since multiple reactors could potentially emit events from the same event type), and add last_actioned_event_id to the tracker state. The reactor is modified to not emit events unless it's past the last actioned event. So if last_processed/seen event ID is less than the last actioned event ID, it's essentially rebuilding the Reactors projection.

Maybe I am misunderstanding something (and please correct me if I am) but @stevehodgkiss you suggest that we store the processor_name in the Event metadata. When a Reactor starts up, if there is no tracker entry, it performs a query to determine the last_actioned_event_id. It performs this lookup by inspecting the last event that matches the event types that it itself emits as well as filtering based on its own processor_name at the time.

Lets say this has been running for 6months, all is working ok, however you now decide this Reactor is actually doing too much (multiple concerns), and it would benefit from being split into two.

So you move some of Reactor1 code into a new Reactor2 which includes the handling of an event which produces a side effect.

Start up the new Reactor2... it tries to find all events that match its emitting event types as well as its own processor_name... opps there are none, so its last_actioned_event_id is set to 0 which will result in all the events it processes as being "actionable" which will in turn cause side effects to be replayed.

Why did this happen? Because the processor_name in the Event metadata is a relevant pointer to the Reactor that produced the Event, only at the point in time that it occurred. But things change, and this thing is just an arbitrary string identifier, so its not necessarily a reliable piece of information to infer what now produces the same side effect in the future.

Therefore my underlying question is: what if the processor_name of the processor that triggers this side effect changes over time? As the Event metadata will be stale and pointing to something that we cannot reliably use.

Shit Solution: When it filters based on processor_name it could filter based off of a list of processor names (formerly known as), but this feels wrong on many levels to burden a Reactor with the responsibility of knowing every processor that produced the same side effect in the past (let alone very hacky).

I also cringe at the suggestion that if a Reactor's name changes (could well be just a conceptual name change - it still handles the same process), that we could just update the class name but leave the processor_name the same. Its inaccurate and inconsistent naming of things like this that I try very hard to avoid.

stevehodgkiss commented 7 years ago

Maybe I am misunderstanding something (and please correct me if I am) but @stevehodgkiss you suggest that we store the processor_name in the Event metadata. When a Reactor starts up, if there is no tracker entry, it performs a query to determine the last_actioned_event_id. It performs this lookup by inspecting the last event that matches the event types that it itself emits as well as filtering based on its own processor_name at the time.

That's the idea. It could store the last actioned ID in the tracker table rather than querying the events metadata at startup (I just wanted to note that we have both options).

Start up the new Reactor2... it tries to find all events that match its emitting event types as well as its own processor_name... opps there are none, so its last_actioned_event_id is set to 0 which will result in all the events it processes as being "actionable" which will in turn cause side effects to be replayed.

The behaviour you're describing would be the same with the current master, 3.2 or 3.4. All tracking state is associated with the processor_name. If you make another reactor with a new processor name it will start from the beginning. To split a reactor into 2 from an existing reactor, you'd need to take the old one down then bring up the 2 new ones at the appropriate event sequence ID. None of that would change with what we're proposing here as far as I can tell at this stage.

mjward commented 7 years ago

💡 Gotcha! I was looking at the problem a little more broadly forgetting this is purely focussed on the 1 x Event Processor instance. Manual migration steps involving hackery of sequence ids still remain if you want to rename or move logic around. Thanks for clarification 🙇

I think I prefer that its derived from the Events (if possible). I'm more relaxed knowing that the reactor will perform this check on startup each time rather than relying on the data that happens to be in the tracker table.