nestjs / cqrs

A lightweight CQRS module for Nest framework (node.js) :balloon:
https://nestjs.com
MIT License
852 stars 149 forks source link

optionally await EventBus publish #134

Closed cyoharry closed 5 years ago

cyoharry commented 5 years ago

I'm submitting a...


[ ] Regression
[ ] Bug report
[X] Feature request
[X] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.

Current behavior

eventBus.publish(new Event())does not return anything, but it would be a nice feature if it could act like commandBus.execute() and allow you to await asynchronous operations within a REST route.

Desired behavior

Common complaints I've heard/read about the CQRS system is that it's a lot of boilerplate for apps that don't necessarily need it. For apps that do, I'll say that it seems like an amazing feature! Making use of events directly, though, is a likely first stage for moving bits of an application in this direction. Additionally, when developing independent modules, it can be a helpful approach to emit events so that more specific business implementations can simply hook in to those events to perform follow on tasks. For example, a UserModule may have basic features for handling users without the specifics of the business needs. In order to add additional features to the application, typically, additional modules can simply depend on UserModule and use alternate endpoints for more specific business actions. This works in most cases, but there are scenarios where a developer would want to do something on createUser, but before the user creation route returns. Making use of a hooks-like pattern for this solves the isolated module problem by allowing another module(like TaskModule) to perform an action when the user is created (like attaching a default task to the user), but before the REST response returns. This may be a poor example, but I'd be happy to elaborate with more.

While it may not be possible with your event architecture, it would be a nice feature if async event handlers could be awaited from eventBus.publish, or if a workaround could be established without the overhead of the full CQRS featureset.

Thanks in advance. I'm just a developer trying to write clean code. Feel free to tell me I'm wrong if I'm way off base.

kamilmysliwiec commented 5 years ago

While it may not be possible with your event architecture, it would be a nice feature if async event handlers could be awaited from eventBus.publish, or if a workaround could be established without the overhead of the full CQRS featureset.

Events by nature have to asynchronous. If you - for some reason - want to delay an HTTP response, you can use RxJS Subjects and "signals/triggers" (subscribe to a subject from the controller and emit a signal from the event handler).

cyoharry commented 5 years ago

Thanks for the advice! While using a subject would solve that problem, I'm looking for an easier to follow API. I've been looking for a way to avoid creating a hook system, which I'm totally willing to do, but I was hoping either an existing API or or approach to development would provide a solution. My ultimate goal is to write my modules in ways that allow them to be highly reusable as dynamic modules, while allowing modules that depend on them to inject custom business logic at specific events.

Thanks again, I'll keep trying approaches 👍

harryhorton commented 5 years ago

For anyone who runs into this issue: I wrote a simple Event-like hooks module with a similar usage to the EventBus. https://github.com/NestPack/hooks

kamilmysliwiec commented 5 years ago

I personally love this @Johnhhorton. Nice idea 👍

leolara commented 4 years ago

The problem with this is that you can shutdown the application with async events still being handled, and hence not completing the execution of the handler.

This is happening to me while using nestjs from the command line, my script closes before all the event handlers have been executed.

I think there should be a wait for you to wait for the completion of events. My events are events, I would prefer not to treat them as hooks.

theo-bittencourt commented 4 years ago

I'm with a similar problem faced by @leolara.

In my case, I need to somehow wait for handlers to finish before closing the app in the Jest's e2e testing.

jarmokoivuniemi commented 4 years ago

I'm with a similar problem faced by @leolara.

In my case, I need to somehow wait for handlers to finish before closing the app in the Jest's e2e testing.

Did you manage to solve this? I'm facing the same issue.

leolara commented 4 years ago

@jarmokoivuniemi @theo-bittencourt I think I ended up using the library mentioned hooks, or something similar. It makes sense for the event bus to be async but not if the process can be stopped before all handers have run. The current implementation is not correct if it cannot warranty that all handlers are going to be run before the process stops.

I do not mean, that you have to be able to wait for them to be done, that is a different requirement and perhaps it is not necessary for a event bus.

rhzs commented 3 years ago

Hi @kamilmysliwiec is there a chance to revisit this? I think it is a good idea to have this in NestJS. The equivalent concept that I can find is in Laravel Action library by Spatie https://github.com/spatie/laravel-queueable-action

It even has a retry mechanism.

bradsheppard commented 3 years ago

@cyoharry I recently forked the CQRS module and changed the AggregateRoot, EventBus and EventPublisher classes to support async methods (see here https://github.com/bradsheppard/nestjs-async-cqrs). I think this might help.

sanderlissenburg commented 2 years ago

@cyoharry I recently forked the CQRS module and changed the AggregateRoot, EventBus and EventPublisher classes to support async methods (see here https://github.com/bradsheppard/nestjs-async-cqrs). I think this might help.

I tried your fork. But it still doesn't wait for the event(s) to finish. So my (cli) app still exits too soon.

bradsheppard commented 2 years ago

@sanderlissenburg If you can send me a reproducible example I'll take a look.

sanderlissenburg commented 2 years ago

@sanderlissenburg If you can send me a reproducible example I'll take a look.

Thank you for the kind offer. In the interest of time, I've chosen to drop the aggregate part and just call the repository from the command handler instead of the event handler. Because aggragateObject->commit() is where the problem lies. That is still fire and forget, even with await in front of it. Maybe if I have time left I revisit this part and come back to you with a simple example.

ugudango commented 2 years ago

While the hook approach from @harryhorton is definitely more sound than what I'm about to suggest, I think this should be doable by including a resolve callback in the event constructor. My use case is batching some events for some bulk external API requests. So, in my case, I use this with sagas.

Here is a heavily stripped-down version of what I mean:

export class MyCustomEvent {
  constructor(
    public readonly remoteResolve: CallableFunction,
  ) {}
}

Then, when we publish events in a command handler:

// ...
await new Promise((resolve) => {
  this.eventBus.publish(new MyCustomEvent(resolve));
}
// ...

And in the event handler/saga we could have something like this:

@EventsHandler(MyCustomEvent)
export class MyCustomEventHandler implements IEventHandler<MyCustomEventEvent> {
  constructor() {}

  handle(event: MyCustomEvent) {
    // do logic
    event.remoteResolve();
  }
}

I can see why this would be violating some principles of coupling, but in my case, I didn't want to introduce a Redis queue with reducers.

rharutyunyan commented 2 years ago

While it may not be possible with your event architecture, it would be a nice feature if async event handlers could be awaited from eventBus.publish, or if a workaround could be established without the overhead of the full CQRS featureset.

Events by nature have to asynchronous. If you - for some reason - want to delay an HTTP response, you can use RxJS Subjects and "signals/triggers" (subscribe to a subject from the controller and emit a signal from the event handler).

Can u please let me know how exactly can I handle this ? for example I am triggering a command in controller and want to wait till event handler is not finished its work ? Any example would be appreciated.

Thanks

drewish commented 10 months ago

It's annoying that this is closed with no real response. This is probably the biggest shortcoming of this module.

Adrianvdh commented 8 months ago

Hi everyone. I've managed to solve this problem.

The TL;DR is don't use the NestJS CQRS EventBus, instead use the EventEmitter @nestjs/event-emitter.

Long Answer

@kamilmysliwiec is correct in saying that events by nature are async. This is the difference between eventual consistency and transactional consistency

By implementation, eventual consistency handles event downstream logic asynchronously long after the user has gotten their Http Response.

By implementation, transactional consistency is what most of you are looking for. Transaction concistency ensures that all the updates to the DB invoked by the system's commands handlers, and event handlers are all atomic. Meaning that you wrap your "command -> command handler -> event -> event handler -> command -> command handler -> ..." in one big transaction.

The difference between the two is that eventual consistency is in the name. The system will become consistent, eventually.

To comment on some of your concerns with error handling in the event handlers after the user has gotten their response is to adopt your own retry mechanism. There are many ways to do this, but generally speaking you'll have to adopt a message broker in between the publishing of events and consuming them. When consuming a message from the broker and an error occurs, the message will simply not be ACK'ed and the broker will allow retrying of the message.

Look here at this repo for a concrete example of using NestJs CQRS with a message broker to include resilience and error handling via a message broker (AWS SQS) nestjs-rest-cqrs-example

Now to solve everyone's issue with having NestJs CQRS module be transactionally consistent I'll provide some snippets of code to get you started with synchronous event handling

import { EventEmitter2 } from '@nestjs/event-emitter';
import { Injectable } from '@nestjs/common';

@Injectable()
export class NestEventBusAdapter {
    constructor(
        private readonly eventEmitter: EventEmitter2,
    ) {
    }

    /**
     * Synchronous event emitter using EventEmitter2 provided by @nestjs/event-emitter.
     * @docs: Return the results of the listeners via Promise.all.
     * @see: https://github.com/EventEmitter2/EventEmitter2#emitteremitasyncevent--eventns-arg1-arg2-
     * @see https://docs.nestjs.com/techniques/events
     */
    async sendEvent<TEvent extends DomainEvent>(events: TEvent[]): Promise<void> {
        await Promise.all(
            events.map(async (event) => {
                try {
                    // FIXME: Fill in request-id
                    Logger.debug(`[request-id] "${event.constructor.name}" event published`);
                    return this.eventEmitter.emitAsync(event.constructor.name, event);
                } catch (e) {
                    throw e;
                }
            }),
        );
    }
}

@Injectable()
@CommandHandler(RegisterUserCommand)
export class NestRegisterUserHandler implements ICommandHandler<RegisterUserCommand, User> {

    constructor(
        private readonly eventBus: NestEventBusAdapter,
    ) {
    }

    async execute(command: RegisterUserCommand): Promise<User> {
        await this.eventBus.sendEvent(new UserCreatedEvent(...))
    }
}

export abstract class DomainEvent {
    public readonly id: string;

    protected constructor() {
    }
}

export class UserCreatedEvent extends DomainEvent {
    constructor(
        readonly id: string,
        readonly email: string,
        readonly firstName: string,
        readonly lastName: string,
        readonly role: string,
    ) {
    }
}

// EventHandler.ts
import { DomainEvent } from '@libs/ddd/DomainEvent';
import { OnEvent } from '@nestjs/event-emitter';

/**
 * Wrapper around OnEvent() that provides the following options:
 * { suppressErrors: false, async: true, promisify: true }
 *
 * @see: https://docs.nestjs.com/techniques/events#listening-to-events
 */
export function EventHandler<T extends DomainEvent>(event: new (...args: never[]) => T): MethodDecorator {
    return function(target: unknown, propertyKey: string | symbol, descriptor: PropertyDescriptor) {
        OnEvent(event.name, { suppressErrors: false, async: true, promisify: true })(target, propertyKey, descriptor);
    };
}

// NestUserCreatedHandler.ts

@Injectable()
export class NestUserCreatedHandler {

    constructor(

    ) {}

    @EventHandler(UserCreatedEvent)
    async execute(event: UserCreatedEvent): Promise<string> {
        [...]
    }
}
Papooch commented 4 months ago

@kamilmysliwiec are you willing to die on the hill that the @nestjs/cqrs doesn't need awaitable events?

Being able o await the resolution of events is essential when building a durable system. Without that feature, the eventBus is by no means production ready because the app can crash at any time and we can't know if an event has been serviced or not (*).

Consider the outbox example: 1) save aggregate to a database, along with all domain events to an outbox table 2) outbox processor pulls unprocessed outbox messages 3) for each outbox message, it converts it to the corresponding domain event and emits it using eventBus 4) once the event is complete the message is marked as processed.

This flow is currently impossible using Events. That means only Commnads and Queries can be used reliably. (btw, I don't need the result of the event, I just need to know it has been completed)

I know we can use a different package, like EventEmitter2 with the emitAsync method (and that's what I always do when implementing this pattern), but hopefully you can see that the ability to await events needed in the real world.

Also please note that in C# .NET Core, the above workflow is very easy to do using MediatR (equivalent of @nestjs/cqrs) that is able to await events.

The Spring framework for Java has both synchronous events and also a way of awaiting events asynchronous ones (although with more magic, as Spring does) using the @TransactionalEventListener annotation.

So why does something that is so simple in other frameworks need to be so complicated in NestJS?


(*) Unless, of course, we emit another event that notifies the original caller. But who needs that much boilerplate?

cesek-navisys commented 3 months ago

I cannot agree more with @Papooch. Why not to get the best from both worlds and let the developers choose what they need and extend the class EventEmitter with async methods that with "Async" as suffix? Does it make sense @kamilmysliwiec ?

    // Existing methods
    publish<TEvent extends EventBase, TContext = unknown>(event: TEvent, context?: TContext): any;
    publishAll<TEvent extends EventBase, TContext = unknown>(events: TEvent[], context?: TContext): any;

    // New async methods
    async publishAsync<TEvent extends EventBase, TContext = unknown>(event: TEvent, context?: TContext): Promise<any>;
    async publishAllAsync<TEvent extends EventBase, TContext = unknown>(events: TEvent[], context?: TContext): Promise<any>;

@kamilmysliwiec are you willing to die on the hill that the @nestjs/cqrs doesn't need awaitable events?

Being able o await the resolution of events is essential when building a durable system. Without that feature, the eventBus is by no means production ready because the app can crash at any time and we can't know if an event has been serviced or not (*).

Consider the outbox example:

  1. save aggregate to a database, along with all domain events to an outbox table
  2. outbox processor pulls unprocessed outbox messages
  3. for each outbox message, it converts it to the corresponding domain event and emits it using eventBus
  4. once the event is complete the message is marked as processed.

This flow is currently impossible using Events. That means only Commnads and Queries can be used reliably. (btw, I don't need the result of the event, I just need to know it has been completed)

I know we can use a different package, like EventEmitter2 with the emitAsync method (and that's what I always do when implementing this pattern), but hopefully you can see that the ability to await events needed in the real world.

Also please note that in C# .NET Core, the above workflow is very easy to do using MediatR (equivalent of @nestjs/cqrs) that is able to await events.

The Spring framework for Java has both synchronous events and also a way of awaiting events asynchronous ones (although with more magic, as Spring does) using the @TransactionalEventListener annotation.

So why does something that is so simple in other frameworks need to be so complicated in NestJS?

(*) Unless, of course, we emit another event that notifies the original caller. But who needs that much boilerplate?