nestjs / docs.nestjs.com

The official documentation https://docs.nestjs.com 📕
MIT License
1.18k stars 1.69k forks source link

Expect sequence of events in saga #903

Open boenrobot opened 4 years ago

boenrobot commented 4 years ago

I'm submitting a...


[ ] Regression 
[ ] Bug report
[ ] Feature request
[x] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.

Current behavior

The docs state for sagas that a saga "can combine, merge, filter [...] events streams."

Does that mean that a saga could trigger a command after a sequence of events? How to do that though? How to monitor different types of events, determine if they're related to one another somehow, and trigger the command only once two or more related events are found in the stream?

Expected behavior

There should be an example of combining two events into a stream and if possible, establishing a simple relation between them. More complex business logic (more events, more complex relations between the events) could extrapolate from that.

I would contribute to docs, if I actually knew the answers to the above questions.

Minimal reproduction of the problem with instructions

What is the motivation / use case for changing the behavior?

As hinted in issues like nestjs/cqrs#64, the distinction between a saga and an event handler is kind of merky... I for one thought that a saga differs exactly in that it can watch for related events, whereas an event handler is specifically for one event at a time.

If that's correct, the docs should give an example demonstrating this rather than simply an alternative to an event handler. If not correct, there should be a minimal example that illustrates where one should use a handler vs. where to use a saga.

Environment


Nest version: latest


For Tooling issues:
- Node version: latest  
- Platform: all 

Others:

imissyouso commented 4 years ago

any progress here?

MateuszNaKodach commented 4 years ago

@kamilmysliwiec What about this? For me Saga is simple event handler, beside that it can process Observable (but event handler are pure in this way, but Saga doesn't provide any new functionalities) .It's not as complex as Saga Pattern from Domain-Driven Design, where we react for events and finish the process somewhere. I will have presentation about EventSourcing, where examples will use your framework and I wanted to show EventHandler and Saga, but for me it doesn't make sens to make a distinguish.

PS. There you also mention that we can persist event in event handler: https://youtu.be/c8hvW14VdkY?t=948 Maybe I'm missing something, but if we persist it in event handler, is it possible to publish the event but wihtout persisting it, which leads to incosistent state?

boenrobot commented 4 years ago

The example at https://youtu.be/c8hvW14VdkY?t=1147 is an interesting one... Though it still doesn't look like it's any better than having something like

@EventsHandler(HeroSlayedDragonEvent, HeroFoundItemEvent)
class DropCoins implements IEventHandler<HeroSlayedDragonEvent | HeroFoundItemEvent> {

    public constructor(private commandBus: CommandBus) {}

    public handle(event: HeroSlayedDragonEvent | HeroFoundItemEvent) {
        if (Math.random() >= 0.3) {
            this.commandBus.execute(new DropCoinsCommand(event.heroId));
        }
    }
}

The required boilerplate is pretty much the same as in a saga, and in either way, each occurrence of either event triggers the same code (in this case, a Math.random() call based on which a command is executed).

boenrobot commented 4 years ago

I stumbled upon the groupBy operator.

I guess that could be used as a starting point to group related events into separate streams, but I'm not sure how one could declare a group completed after all required events are in it, so that the resulting group is turned into an array, and given to one last observable that would use all events in that array to call a command.

I'm very bad at RxJS though...

Let's say the goal is to run the above code only if the same hero has first slayed a dragon, and found an item after that. Killing two dragons and finding one item after that must be just one drop chance, not two, and slaying two dragons, and finding two items after that also counts as just one drop chance, not two. Finding an item and killing a dragon after that doesn't count either.

Any RxJS ninjas around that can do this sort of thing?

I managed to reach:

@Saga()
dropCoins = (events$: Observable<IEvent>) => {
  events$.pipe(
    groupBy(ev => ev.heroId)
  ).pipe(
    map(group => {
      const sequences = zip(
        group.pipe(
          filter(ofType(HeroSlayedDragonEvent))
        ),
        group.pipe(
          filter(ofType(HeroFoundItemEvent))
        ),
        (...events) => {
          return {heroId: group.key, events};
        }
      );
      return sequences.pipe(
        map(sequence => {
          if (Math.random() >= 0.3) {
            return new DropCoinsCommand(sequence.heroId);
          }
        })
      );
    })
  );
}

But that would trigger the drop chance twice for the sequence HeroSlayedDragonEvent, HeroSlayedDragonEvent, HeroFoundItemEvent, HeroFoundItemEvent

The first HeroSlayedDragonEvent is matched with the first HeroFoundItemEvent, and the second HeroSlayedDragonEvent is matched with the second HeroFoundItemEvent. I guess that's a legit scenario in other cases, so it could still be given in the docs as an example... It's just not really the scenario I was after :/ .

xdave commented 4 years ago

I came up with this... maybe it'll be useful for someone.

import { Type } from '@nestjs/common';
import { IEvent, ofType } from '@nestjs/cqrs';
import { pipe } from 'rxjs';
import { filter, groupBy, mergeMap, scan } from 'rxjs/operators';

export const scanEventsBy = <T extends IEvent>(key: keyof T, n: number) =>
    pipe(
        scan<T, Record<string, T>>(
            (events, event) =>
                Object.assign(events, {
                    [`${event[key]}`]: event,
                }),
            {},
        ),
        filter(events => Object.keys(events).length === n),
    );

export const groupEventsBy = <T extends IEvent>(
    key: keyof T,
    using: keyof T,
    ...eventTypes: Array<Type<T>>
) =>
    pipe(
        ofType(...eventTypes),
        groupBy(event => event[key]),
        mergeMap(group => group.pipe(scanEventsBy(using, eventTypes.length))),
    );

Example usage:

// Some events
export class TestEvent1 {
    eventName = this.constructor.name;
    aggregateId = 'test1';
}

export class TestEvent2 {
    eventName = this.constructor.name;
    aggregateId = 'test1';
}
@Injectable()
class SomeSagaClass {
    @Saga()
    test = (event$: Observable<IEvent>) =>
        event$.pipe(
            groupEventsBy('aggregateId', 'eventName', TestEvent1, TestEvent2),
            flatMap(events => [
                new DoSomethingForEvent1Command(events.TestEvent1.aggregateId),
                new DoSomethingForEvent2Command(events.TestEvent2.aggregateId),
            ]),
        );
}
// Somewhere else, example
... this.eventBus.publishAll([new TestEvent1(), new TestEvent2()]);
brianorwhatever commented 3 years ago

wow thank you @xdave! battled rxjs all day and this worked great.

brianorwhatever commented 3 years ago

hmm only problem is that it thinks all of my events are of the first type given..

brianorwhatever commented 3 years ago

for anyone stumbling onto this issue I had to explicitly type the event back to my event type like this. If anyone knows of a way to clean this up let me know! image

xdave commented 3 years ago

Yeah it's an inelegant solution, I haven't come up with anything else yet.

justinpenguin45 commented 1 year ago

Any updates on this @kamilmysliwiec