EventStore / replicator

Real-time replication tool
https://replicator.eventstore.org
Apache License 2.0
20 stars 13 forks source link

Idempotent ESDB Sink #35

Open kaancfidan opened 2 years ago

kaancfidan commented 2 years ago

Is your feature request related to a problem? Please describe. When a replicator restarts from scratch for any reason (with lost state), it currently does not check if the events were already published to the sink or not. So duplicate events can be inserted into the sink database.

Describe the solution you'd like Version expectation could be specified when migrating events to the sink database and any event duplication could be gracefully handled.

DEV-84

alexeyzimarev commented 2 years ago

That's known, but I am not sure how to solve it. Of course, if we disable the scavenge filter, and any other filter, we can expect versions to match 100%. But if any filter is enabled, then tracking versions is challenging, unless replicator uses some other place to keep track of the last written (and ignored) event version across all the streams it has replicated.

I am thinking if enabling some more reliable checkpoint store is a way to mitigate the issue. Say if we add a MongoDB checkpoint store, would it help?

kaancfidan commented 2 years ago

I'm not sure about all the events in the $all stream, but for the user streams optimistic concurrency seems trivial to me (although here it's not about concurrency but guarding against lost state).

Can't we make an exception for user streams so that you could guarantee there would be no corruption in user data?

Regarding the reliable checkpoint implementation; MongoDB would work better than simple file storage, but SQLite could be as reliable (atomic changes...etc.) with simpler deployment.

alexeyzimarev commented 2 years ago

SQLite would suffer from the same issue of losing its state. I mean using an external database, like Mongo Atlas, or any database, which is not part of the replicator deployment in itself.

I am not sure how keeping track of recorded events for all the replicated streams is trivial. Yes, it is trivial to check for every write if the event is there, but it would totally destroy the performance.

kaancfidan commented 2 years ago

SQLite would suffer from the same issue of losing its state. I mean using an external database, like Mongo Atlas, or any database, which is not part of the replicator deployment in itself.

I don't think I have lost the state because the disk (mounted volume) was lost. The volume was always there (at least its age I read from Kubernetes API was older than the new pod), but the state file was somehow lost. I suspect it happened because the file was open when I killed the pod and it could not recover gracefully. I think SQLite would handle such a problem if that's the case.

Don't get me wrong, MongoDB or any external database is a sure-fire solution. I was just suggesting a simplification. We could add the MongoDB deployment in the Helm charts in a few minutes and it would probably work.

Yes, it is trivial to check for every write if the event is there, but it would totally destroy the performance.

Just to make sure we are on the same page; I wasn't suggesting first querying the event, then writing if necessary. If we are using good-old "append to stream" method of TCP client here, we could utilize the version expectation parameter in that. If you think this version expectation on write would hinder the write performance, you are probably more knowledgeable about that than I am.

kaancfidan commented 2 years ago

If you check to see the incident at #36, there is a point where I try to delete the pod which signals the replicator process to stop and wait for graceful exit.

The replicator process hangs because it is waiting for the sink pipe to drain, which cannot happen as it cannot connect to the sink database anymore. Kubernetes then kills the process after some predefined timeout for graceful termination. The last log from the killed process is the following:

{"@t":"2022-01-04T13:36:21.4199286Z","@m":"Waiting for the sink pipe to exhaust (1000 left)...","@i":"06fcac33","Left":1000,"SourceContext":"EventStore.Replicator.Replicator"}

I suspect that this abrupt killing of the process made it lose its state file.

alexeyzimarev commented 2 years ago

we could utilize the version expectation parameter in that.

It doesn't need to be TCP, the same works with gRPC. What I am saying is the following.

Imagine now we save the expected version for each stream to some external db. Still, there is a chance that we append an event to the sink, but fail to write the version as it's a two-phase commit. It means that we can still write one event twice.

Normally, duplicates are handled even without the expected version (with Any) if the replicator restarts quickly and the sink hasn't been restarted too. It's because we ensure idempotent writes using EventId when events are still in the cached chunk. I would guess (can't check) that the size of the cache depends on the instance size in ES Cloud. The larger RAM it has, the bigger is the cache, so you get less chance of appending duplicates.

But, of course, with 500K events it would never work. Getting some checkpoint would be useful. That's why I think it would be enough to make an alternative storage option for the checkpoint.

As for the checkpoint file, it might be an option to backup the existing file and create a new file each time. It means that we'd have a normal file and a bak file. When reading the checkpoint, I could try getting both.

A more robust solution would be to read the target stream when doing the first write to the sink for a given stream. We have the stream meta cache for all streams anyway, so it won't increase memory usage too much. From that moment on, it should be possible to know what the expected version should be.

kaancfidan commented 2 years ago

I now understand better the problem with user-defined filter functions and version expectation.

Currently, I just want to copy all my events without any transformation or filtering; hence, in my case when the replicator sees stream-2:251 it's trivial to expect stream-2 to be at event 250.

I think the use-case of refactoring old event data clashes with basic data migration. It would complicate the design a bit, but maybe handling these use-cases differently might be the better path.

As for the checkpoint file, it might be an option to backup the existing file and create a new file each time. It means that we'd have a normal file and a bak file. When reading the checkpoint, I could try getting both.

Any small difference between the backup position and the actual sink position will create duplicate events. I would assume trying to guarantee to keep the backup file 100% consistent with the sink database might also hinder the performance a bit. Not sure if anyone would feel the difference though.

I think I'm happy enough with your MongoDB checkpoint store suggestion.

alexeyzimarev commented 2 years ago

Have you noticed that the scavenge filter is enabled by default? Lots of customers struggle to migrate as some of the older v5 versions have scavenging issues. Also, scavenging is an IO-intensive operation, so a large scavenge could hinder the live cluster performance. That's why I added the online scavenge filter to the replicator, to ignore rubbish data. I think it's pretty much relevant for a lot of people.

Any small difference between the backup position and the actual sink position will create duplicate events.

Not really, check my previous comment about idempotent writes. If the gap is small (and file checkpoint is stored for every 1000 events), it will not produce duplicates unless the cluster restarts.

kaancfidan commented 2 years ago

I think I understand your point.

If you can guarantee that the position difference between the backup and the sink will be smaller than the Event Store's idempotence window, I also wouldn't expect any duplicate writes to happen.