Papooch / nestjs-cls

A continuation-local storage (async context) module compatible with NestJS's dependency injection.
https://papooch.github.io/nestjs-cls/
MIT License
453 stars 29 forks source link

Potential nest-js cls plugin: `@Rollbackable()` #176

Closed drakedeatonuk closed 1 month ago

drakedeatonuk commented 1 month ago

In the project I'm working on, we use a Rollback class that we wrap around function returns in order to manage multi-transaction rollbacks across multiple third party services (e.g. db api, payment processor api, customer support api, etc...).

This is roughly how the rollback logic works:

export class Rollback {
  private readonly _rollbacks: Array<() => PromiseLike<unknown>> = [];

  get rollbacks(): Array<() => PromiseLike<unknown>> {
    return this._rollbacks;
  }

  push<T>(fn: () => PromiseLike<T>): void {
    this._rollbacks.push(fn);
  }

  async rollback(): Promise<void> {
    for await (const rollback of this._rollbacks) {
      try {
        await rollback();
      } catch (e) {
        console.log(e);
      }
    }
  }
}

export type Rollbackable<T> = {
  res: T;
  rb: Rollback;
};

export const withRollback = async <T>(fn: (r: Rollback) => Promise<T>): Promise<Rollbackable<T>> => {
  const rollback = new Rollback();
  try {
    const res = await fn(rb);
    return { res, rb };
  } catch (e) {
    await rb.rollback();
    throw e;
  }
};

With the above structures, you can essentially manage quite complex rollbacks across many different layers.

Here is an example of how the above structures can be used to manage rollbacks across multiple third party services:

class UserSvc {

  constructor(
    private UserDbSvc: UserDbSvc,
    private UserThirdPartySvc: UserThirdPartySvc,
  ){}

  async createUser() {
    const rollback = new Rollback();
    try {
      const dbCreate = await UserDbSvc.create();
      rollback.push(dbCreate.rb.rollbacks)      

      const thirdPartyCreate = await UserThirdPartySvc.create();
      rollback.push(thirdPartyCreate.rb.rollbacks);      

  }  catch (e) (
      // when exec is called, it executes the rollback logic that both services provided
      rollback.exec();
    }
  }

}

class UserDbSvc {

  constructor(
    private DbSvc: DbSvc,
  ) {}

  async create(): Rollbackable<void> {
     return withRollback(async rollback => {
       const newUser = await this.DbSvc.user.create();

       // add any rollback logic needed at the time that the C/U/D operation occurrs
       rollback.push(() => this.Db.user.delete(newUser))
    });
  }

}

As a nest-js/cli plugin, the above logic would be greatly simplified:

class UserSvc {

  constructor(

    // the nestjs-cls plugin
    private readonly RollbackHost: RollbackHost,

    // the third party applications which need rollback logic managed
    private UserDbSvc: UserDbSvc,
    private UserThirdPartySvc: UserThirdPartySvc,
  ){}

  // the use of a @Rollbackable could function similarly to @Transaction 
  @Rollbackable()
  async createUser() {
    try {
      await UserDbSvc.create();
      await UserThirdPartySvc.create();
    }  catch (e) (
       /// .... a distinction from @Transaction would be that to execute the rollbacks, a rollback call would be required
      this.RollbackHost.rollback();
    }
  }

}

class UserDbSvc {

  constructor(
    private RollbackHost: RollbackHost,
    private DbSvc: DbSvc
  ) {}

  async create() {
     const newUser = await this.DbSvc.user.create();
     await this.RollbackHost.push(() => this.Db.user.delete(newUser))
  }

}

To summarise, how this plugin would be used:

  1. wrap a function in the Rollbackable decorator
  2. inject the RollbackHost into any services with methods used within the Rollbackable-wrapped method.
  3. manually add the rollback logic within those nested method-calls.
  4. execute the rollback logic when needed by calling RollbackHost.rollback within the Rollbackable-wrapped method.

...

To summarise how this plugin differs from the Transactional plugin. In this plugin:

  1. rollback logic could be used to execute rollbacks across multiple third party apis.
  2. rollback logic is added manually by the dev to AsyncLocalStorage when C/U/D operations occur.
  3. rollback logic is triggered manually by the dev.

Any feedback on feasibility or ergonomics would be greatly appreciated :)

Papooch commented 1 month ago

Hi, thank you for the suggestion, but I see that you are trying to implement distributed transactions without a shared coordinator. There is no guarantee that the rollback will ever be called, or that all the rollbacks will be called (the app can crash in the meantime). There is multiple ways in which you can end up with inconsistent data.

That alone is the reason I won't consider adding a plugin like this, but feel free to implement it yourself with the existing plugin as a reference.

Btw, if you're working with 3rd party APIs, you would definitely benefit from the "Outbox" pattern.

drakedeatonuk commented 1 month ago

Right gotchya, that makes a lot of sense. Thanks very much for the feedback. Spent some time reading up on the outbox pattern today. I can see the benefits of the approach over my current solution. Much more robust.... Looks like I've got some work to do!

πŸ™

drakedeatonuk commented 2 weeks ago

@Papooch I've been spending more time looking into the Outbox pattern as I'm keen to try to use it to implement rollbacks across different third party services in a more robust way than the approach I proposed previously.

If you've got a couple minutes at any point I'd love to know what you think of this approach:

Overview

Schema

Outbox

outboxEventId - unique row id workflowId - a unique identifier for the stack in which the workflow is occurring eventType - the name of the event to emit for this workflow step rollbackEventType - the name of the event to emit in the case of rolling back this outbox event rollbackArgs - the args to pass when calling the rollbackEventType hasWorkflowFailed - false if no sep in the workflow has failed, true if any has. createdOn updatedAt

Example Usage

class WorkflowSvc {
  constructor () { }

  async run(
    args: T
  ) {

      const dbCreate = await UserDbSvc.create();

      const thirdPartyCreate = await UserThirdPartySvc.create();

      const transactionEnder = await this.OutboxSvc.complete()

  }
}

class UserDbSvc {

  constructor(
    private OutboxSvc: OutboxSvc,
    private DbSvc: DbSvc
  ) {}

  async create() {
     return this.DbSvc.user.create()
       .then(user => 
      this.OutboxSvc.create({
            eventType: 'user.created',
        rollbackEventType: 'user.delete',
            rollbackPayload: { where: { userId: newUser.userId } },
    })
      )
      .catch(e => this.OutboxSvc.fail())
  }

}

class OutboxSvc {

  constructor(
    private DbSvc: DbSvc,
    private ClsSvc: ClsSvc<object>
  ) {}

  /**
   *  creates a new outbox entry with a 'stackId' which links all outbox transactions that occurr in the same workflow
   */
  async create(args) {
     return this.Db.outbox.create({
       data: {
         ...args,
         stackId: this.ClsSvc.get("stackId")
       }
     })
  }

  /**
   *  ends an set of transactions by deleting all outbox transactions that match the current workflow's stackId
   */ 
  async complete() {
     return this.Db.outbox.deleteMany({
       where: {
         stackId: this.ClsSvc.get("stackId")
       }
     })
  }

  /**
   *  ends an set of transactions by deleting all outbox transactions that match the current workflow's stackId
   */ 
  async fail() {
     return this.Db.outbox.updateMany({
       where: {
         stackId: this.ClsSvc.get("stackId")
       },
       data: {
         hasWorkflowFailed: true
       }
     })
  }

}

OutboxListener {

  // poll the DB table periodically
  // if any rows are marked as "hasWorkflowFailed == true", rollback workflow based on 'createdOn' times

}

This is quite a simple example but it should provide the gist of the approach. Hopefully I have made the logic clear enough. I appreciate it's not strictly the outbox pattern, but it certainly borrows from it.

There's potentially scope to make a cls plugin that could abstract away some of the outbox logic, I am thinking.

Anyways - if you have some minutes, would love to hear what you think! No worries if not.

πŸ™

Papooch commented 2 weeks ago

Hi, yes, it makes sense. Although this is not definitely an Outbox pattern, it's much closer to the "Saga" pattern (or sometimes called a Long-Running Action (LRA)). A Saga instance stores state of a single workflow execution.

Your "rollback events" are what is commonly referred to as "compensating actions" in the BASE world - an action that undoes an earlier change, until which the sytem state can be inconsistent (as opposed to ACID, where each state change must be atomic and consistent).

Before you go ahead and reinvent the entire wheel, have a look at https://temporal.io/, which implements this kind of pattern.

drakedeatonuk commented 2 weeks ago

Very cool stuff @Papooch appreciate the insight.

I've found an example of the Saga pattern with compensating actions using Temporal: https://github.com/temporalio/samples-typescript/blob/main/saga/src/workflows.ts . I'll have a crack at adding a workflow into my nestjs monolith.

Will be some months before I have the time to tackle this but may let you know how I get on down the road! Thanks again πŸ™