nestjs / nest

A progressive Node.js framework for building efficient, scalable, and enterprise-grade server-side applications with TypeScript/JavaScript 🚀
https://nestjs.com
MIT License
66.89k stars 7.56k forks source link

EventBus implementation for Reactor pattern #885

Closed xmlking closed 6 years ago

xmlking commented 6 years ago

I'm submitting a...


[ ] Regression 
[ ] Bug report
[x] Feature request
[ ] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.

Current behavior

There is no easy way to call one service’s method from other service other then injecting service and calling directly

Expected behavior

We need a framowkt for @on('int.echo'), notify(‘int.echo’) style inter component communication. This will enable use to fire an event from controller and gateway react to it and send websocket message to client

Minimal reproduction of the problem with instructions

What is the motivation / use case for changing the behavior?

Grails/spring example Example

https://objectpartners.com/2015/10/22/asynchronous-programming-in-grails-3/

Environment


Nest version: X.Z


For Tooling issues:
- Node version: XX  
- Platform:  

Others:

whtiehack commented 6 years ago

@nestjs/cqrs

发自我的 iPhone

在 2018年7月17日,22:54,Sumanth Chinthagunta notifications@github.com 写道:

I'm submitting a...

[ ] Regression [ ] Bug report [x] Feature request [ ] Documentation issue or request [ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow. Current behavior

There is no easy way to call one service’s method from other service other then injecting service and calling directly

Expected behavior

We need a framowkt for @on('int.echo'), notify(‘int.echo’) style inter component communication. This will enable use to fire an event from controller and gateway react to it and send websocket message to client

Minimal reproduction of the problem with instructions

What is the motivation / use case for changing the behavior?

Grails/spring example Example

https://objectpartners.com/2015/10/22/asynchronous-programming-in-grails-3/

Environment

Nest version: X.Z

For Tooling issues:

  • Node version: XX
  • Platform:

Others:

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub, or mute the thread.

xmlking commented 6 years ago

@whtiehack thanks I need to explore how to make use of it. My case is simple. Here is what I am planing to implement: I have a generic websocketGateway that keep track of connected client sockets + user.

  1. When I have to send a message/object to an user form e.g, a controller/service , I inject WebsocketGateway and call send(user, dara) method on gateway.
  2. When WebsocketGateway receives a message from end user, based on message type , will publish apprirate command so that handler will execute business logic.
shekohex commented 6 years ago

What about using Event Emitter ? There is a built-in module in nodejs.

I will soon release a couple of modules to help with this with Decorators of course, and eventemitter is one of them

xmlking commented 6 years ago

@shekohex great idea. I will explore today and share my results

xmlking commented 6 years ago

Ended up with this implementation. I can now send action objects from UI to backend and vice versa.
@shekohex @kamilmysliwiec looking forward for your Decorators to reduce boilerplate code. this can be coupled with @amcdnl ngxs-websocket plugin eventbus.gateway.ts

import {
  OnGatewayConnection,
  OnGatewayDisconnect,
  OnGatewayInit,
  SubscribeMessage,
  WebSocketGateway, WebSocketServer,
  WsResponse,
} from '@nestjs/websockets';
import { Observable, of } from 'rxjs';
import {EventEmitter} from 'events';
import {Logger, UseGuards} from '@nestjs/common';
import {delay} from 'rxjs/operators';
import { ISocket } from './interfaces/socket.interface';
import { Server } from 'socket.io';
import {getActionTypeFromInstance, actionMatcher} from '@ngxs/store';
import {AuthService, User, WsAuthGuard} from '../auth';

@WebSocketGateway({ namespace: 'eventbus'})
export class EventBusGateway extends EventEmitter implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
  private readonly logger = new Logger(EventBusGateway.name);
  static EVENTS = 'events';
  static ACTIONS = 'actions';

  @WebSocketServer() server: Server;
  clients: ISocket[] = [];

  constructor(private authService: AuthService) {
    super();
  }

  public afterInit(server) {
  }

  public handleConnection(client: ISocket) {
    this.logger.log(`Client connected => ${client.id}  ${client.handshake.query.token}`);
    this.clients.push(client);
  }

  public handleDisconnect(client: ISocket) {
    this.logger.log(`Client disconnected => ${client.id}`);
    this.clients = this.clients.filter(c => c.id !== client.id);
  }

  @UseGuards(WsAuthGuard)
  @SubscribeMessage('auth')
  onAuthenticate(client: ISocket, [data, cb]) {
    const event = 'auth';
    if (cb) {
      // We will be sending via callback here.
      cb({status: 'success'});
    } else {
      return {event, status: 'success'};
    }
  }

  @SubscribeMessage('actions')
  onActions(client: ISocket, action: any) {
    this.logger.log(`test  => ${client.id}  ${client.user.userId}`);
    this.emit(action.type, action);
  }

  public sendActionToUser<T>(user: User, action: any): void {
    const clients = this.getSocketsForUser(user);
    const type = getActionTypeFromInstance(action);
    clients.forEach(socket => socket.emit(EventBusGateway.ACTIONS, { ...action, type }));
  }

  private getSocketsForUser(user: User): ISocket[] {
    return this.clients.filter(c => c.user && c.user.userId === user.userId);
  }
}

notification.service.ts

import {Injectable, OnModuleDestroy, OnModuleInit} from '@nestjs/common';
import {User} from 'auth';
import {EventBusGateway} from 'eventbus';
import {AddNotification, SeenNotification} from 'core';

@Injectable()
export class NotificationService implements OnModuleInit, OnModuleDestroy {

  async onModuleInit() {
    this.eventBus.on(AddNotification.type, this.addNotification.bind(this));
    this.eventBus.on(SeenNotification.type, this.seenNotification.bind(this));
  }
  onModuleDestroy() {
    this.eventBus.off(AddNotification.type, this.addNotification.bind(this));
    this.eventBus.off(SeenNotification.type, this.seenNotification.bind(this));
  }

  constructor(private readonly eventBus: EventBusGateway) {}

  sendNotification(user: User, action: any) {
    this.eventBus.sendActionToUser(user, action);
  }

  addNotification(action: AddNotification) {
    console.log('AddNotification', action);
  }

  seenNotification(action: SeenNotification) {
    console.log('SeenNotification', action);
  }
}

notification.controller.ts

import {Controller, Get, HttpCode, HttpStatus, Logger, Req} from '@nestjs/common';
import {CurrentUser} from '../auth';
import {NotificationService} from './notification.service';
import {AddNotification, SeenNotification} from 'core';

@Controller()
export class NotificationController {
  constructor(private readonly nService: NotificationService) {}

  @Get('/notifications')
  @HttpCode(HttpStatus.ACCEPTED)
  notifications(@Req() req, @CurrentUser() user) {
    this.nService.sendNotification(user, new AddNotification('test123'));
    this.nService.sendNotification(user, new SeenNotification('test321'));
  }
}

notification.actions.ts

// Actions
export class AddNotification {
  static readonly type = '[Notification] Add';
  constructor(public readonly payload: any) {}
}

export class SeenNotification {
  static readonly type = '[Notification] Seen';
  constructor(public readonly payload: any) {}
}
kamilmysliwiec commented 6 years ago

Thanks for sharing @xmlking. Another possible solution would be to use rxjs subjects.

lock[bot] commented 4 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.