Nebo15 / sage

A dependency-free tool to run distributed transactions in Elixir, inspired by Sagas pattern.
MIT License
939 stars 41 forks source link

RFC: Persist state across Sage executions #9

Open AndrewDryga opened 6 years ago

AndrewDryga commented 6 years ago

We can add a persistence adapter for Sage. The main goal of implementing it is to let users recover after node failures, making it possible to get better compensation guarantees. As I see it we need:

  1. Ability to use adapters (probably Ecto and RabbitMQ) to persist execution stages;
  2. Callbacks/Behaviour to define loader that resumes persistent compensations that are not completed.
  3. We need to make sure that compensations are processed "exactly once", or "at least once" (we need to take a tradeoff here).

This mechanism should be prone to network errors, eg. we should persist a state before executing a transaction, so that even if the node is restarted, compensation (even trough without knowing anything about the effect) would still start, potentially on another node.

michalmuskala commented 6 years ago

The simplest solution could be to use the :disk_log module - it powers mnesia transactions.

AndrewDryga commented 6 years ago

@michalmuskala I've read through :disk_log and it looks like it's not the best fit for this problem:

  1. There is no way to clean up old logs, we don't want them to grow infinitely and it's safe to remove effects after Sage execution or compensation is completed. Otherwise, we would have lots of log entries which are never used.

  2. Running it in a distributed mode is not that trivial, even for this module it's recommended to have a wrapper process on each node that sync local disk logs.

We need something with fast writes and deletes by sacrificing reads, like Log-Structured Merge Trees. But I'm not aware of good implementation which would not bring too much maintenance burden.

Any ideas?

AndrewDryga commented 6 years ago

Implementation can work something like this:

  1. We add a persistence adapter to the Sage interface
  2. Before any transaction is executed we persist fact that we started the execution and current state (so if a node fails before a transaction is succeeded or async task is spawned, we would still run compensations for them).
  3. After a transaction is executed, we update persistent state to include created effect.
  4. Same applies for compensations, but we update state only when compensation is succeeded. We must not assume that effect is compensated just because we executed the compensation.
  5. After Sage is execution is finished (either successfully or after compensating all effects), we delete persisted data
  6. Sage.Executor should provide a function to run all compensations for a given Sage from a saved checkpoint
  7. We need to implement simple adapter which persists Saga state to a file and restarts compensations when node is started.

Probably we should remove compensation error handler since it looks obsolete with an ability to persist and retry compensations after a critical failure.

Questions:

  1. How to continue the execution on another node? Sage struct includes anonymous functions an mfa tuples, it's possible that there would be no code to handle compensations on a target node.
michalmuskala commented 6 years ago

I think we should keep it all local - at least in the first iteration. I don't think it's the distribution of the log (or transaction persistence) would be actually that useful.

The simple adapter could just use a dets table - it might not be the best tool for the job, but it's a no-dependencies solution that should allow validating the idea.

Lastly, saving funs might be a problem - any code change would basically make it irrecoverable (since serialisation for funs includes the md5 hash of the module). Maybe the persistence feature should only work with mfa tuples?

AndrewDryga commented 6 years ago

@michalmuskala Sage which relies on MFA instead of callbacks in a form &do_something/3 is so much harder to read :(. I'll think more about it.

AndrewDryga commented 6 years ago

If we further extend persistence with external transaction ID we would be able to trigger recovery externally by executing the same Sage with the same ID and attributes, which would provide idempotency feature described #2.

AndrewDryga commented 6 years ago

We can use LSM DB from NHS https://github.com/martinsumner/leveled

keathley commented 6 years ago

I think we should keep it all local - at least in the first iteration. I don't think it's the distribution of the log (or transaction persistence) would be actually that useful.

I think this is fine for an initial pass but I'm not sure this is a correct default to have long term. It would be problematic for our uses. But as long as persistence is behind an adapter as described everything should be fine since we can swap in our own persistence.

Thanks for this library and the thought you're putting into it :+1:.

deepankar-j commented 4 years ago

@AndrewDryga, is there any update on this issue? Thanks!

AndrewDryga commented 4 years ago

Hey @deepankar-j. I'm not spending much time adding features to Sage as it fits our use cases at a time as it is now. So if anyone wants - this one is open for grabs.

deepankar-j commented 4 years ago

Thanks @AndrewDryga. Out of curiosity, how do you handle deployments for your system while a saga is the middle of executing.

We have a three-node cluster on which we do a rolling update. This is why I'm interested in the execution log and saga recovery. However, I imagine that updating in the middle of a saga execution might not be as big of an issue for blue-green deployments.

chulkilee commented 4 years ago

I think the goal is to handle process crash - not just node failure...

In my opinion this shouldn't be special to sage. For example, the whole context can be rehydrated before calling sage execution (or at the first step of sage) and the all steps should be able to handle that.

In my case we keep everything in the database, and each sage step are idempotent with the information available in database - e.g. when we know it's already done (e.g. an attr for external resource is already set ) then execution step would be no-op.

AndrewDryga commented 4 years ago

@deepankar-j we currently don't handle it as it's pretty rare for our system. It's the same as not handling node crash before Sage was introduced, but with Sage's semantics code is much easier to understand.

Also, if your functions are idempotent (eg. you persist some state between steps with an upsert operation) the saga would be persistent without any handling from the library.

cjbottaro commented 2 years ago

Ha, I'm implementing my own distributed transaction system and started to think "I have to be reinventing the wheel here", so I found this cool project!

My system runs in Kubernetes so pods are very frequently killed, restarted, deleted, etc, so I must persist my transaction state. Currently my system uses Redis and Jetstream. Write the transaction to Redis, enqueue a Jetstream message, and have consumers process the message which update the transaction state in Redis. Notification of transaction/step values is all done with pub/sub (which comes built-in with Jetstream).

Jetstream handles work-queue type stuff... retries, acks, etc.

Each step in a transaction must be idempotent, because the system has at-least-once delivery semantics.

Anyway, long winded way to say... maybe there should be a behaviour that specifies how to store/load transaction state, that way people can use whatever storage system they want?

AndrewDryga commented 2 years ago

@cjbottaro there is no interface right now but the idea was to introduce such behavior and then move the state into a separate module that would be a default implementation. If somebody has a practical need for this a PR would be welcome.

Keep in mind that persisting state is not as easy as it sounds, for example, you need some sort of two-phase commits for every execution step to recover from failure when execution has finished but the node is restarted before Sage persisted the state.

cjbottaro commented 2 years ago

Keep in mind that persisting state is not as easy as it sounds, for example, you need some sort of two-phase commits

Yes, but even then, I think 2-phase commits relies on something being 100% reliable (i.e. the file system). Which doesn't jive with networked/distributed systems.

This problem is not fun at all. https://en.wikipedia.org/wiki/Two_Generals%27_Problem

I'd love to brainstorm on this. My current thoughts are around "eventual consistency" systems... but that depends on detecting inconsistencies.

cc @AndrewDryga

AndrewDryga commented 2 years ago

@cjbottaro the thing I would focus on would be making sure that when your node receives SIGTERM it can stop gracefully, you should cut the traffic to it and give it time to finish processing sagas.

This way your k8s deployment would be a bit slower but you won't have sagas to be stopped mid-way (unless node crashes or OOM which is rate). I feel like this approach would be more practical than solving distributed systems issues.

a3kov commented 1 year ago

Can use khepri for this