nestjs / nest

A progressive Node.js framework for building efficient, scalable, and enterprise-grade server-side applications with TypeScript/JavaScript 🚀
https://nestjs.com
MIT License
67.69k stars 7.63k forks source link

@ConnectedSocket() decorator is giving wrong socket instance with custom ws-adapter #4176

Closed dekavox closed 4 years ago

dekavox commented 4 years ago

Bug Report

hi together. I'm quite new to nest.js and typescript in general but so far I really like the concept behind it. I've been stuck with this problem for a few days now and I really would appreciate some help with this. so far I isolated the error to the following point.

Current behavior

i created a custom websocket adapter based on the websocket/ws package. so far it seems to be working properly but if I use a subscription and use the @ConnectedSocket() (or use the syntax without decorators stated out in the documentation) I always get the socket instance that connected first to my server but not the socket which sent the message.

Input Code

ws-adapter-custom.ts

import * as WebSocket from 'ws';
import { MessageMappingProperties } from '@nestjs/websockets';
import { AbstractWsAdapter } from '@nestjs/websockets';
import { Observable, fromEvent, EMPTY } from 'rxjs';
import { mergeMap, filter, map } from 'rxjs/operators';

export class WsAdapterCustom extends AbstractWsAdapter {

  protected readonly httpServer: any;
  static wsServer;
  static handlers: MessageMappingProperties[];

  constructor(appOrHttpServer) {
    super(appOrHttpServer);

    WsAdapterCustom.wsServer = new WebSocket.Server({noServer: true})
  }

  // function that is called to create a new websocket-server instance
  create(port: number, options: any = {}): any {

    console.log('websocket-server created!');

    this.httpServer.on('upgrade',(request, socket, head)=>{

      if(request.headers.upgrade.toLowerCase() === 'websocket'){
          console.log('upgrade from http to websocket requested by client!')

          // upgrade to wsServer instance of responsible wsGateway
          WsAdapterCustom.wsServer.handleUpgrade(request, socket, head, (wsClient)=>{
              console.log('handleUpgrade called, "connection" will be emitted!');

              // generate binding from websocket to user here
              WsAdapterCustom.wsServer.emit('connection', wsClient, request);
          });
      }
      else{
          console.log('upgrade from http requested by client, but not to websocket!')
      }
    });

    console.log('http-server upgrade-callback-function registered!')

    WsAdapterCustom.wsServer.on('connection', this.clientConnect);

    return WsAdapterCustom.wsServer;
  }

  // function that is called to delete a websocket-server instance
  close(server) {
    server.close();
  }

  clientConnect(wsClient: any){
    console.log('client connected!');

    let wsClientSocket = wsClient._socket;
    console.log('client Socket remoteFamily: ',wsClientSocket.remoteFamily);
    console.log('client Socket remoteAddress: ',wsClientSocket.remoteAddress);
    console.log('client Socket remotePort: ',wsClientSocket.remotePort);

    wsClient.on('close', (wsClient) => {
      console.log('client disconnected!');
    });

  }

  bindMessageHandlers(
    wsClient: any,
    handlers: MessageMappingProperties[],
    process: (data: any) => Observable<any>,
  ) {

    // bind text-message-handlers once for each client
    // for each new wsClient all subscribed callbacks are registere to to its "message" event
    // filtering is done by the "event" field of the message so only the matching subscription is called if a message arrives
    if(!wsClient.stringMessageHandlerRegistered){ // no event-handlers registered for clients-event "message"
      fromEvent(wsClient, 'message')
      .pipe(
        mergeMap(data => this.bindMessageHandler(data, handlers, process, wsClient)),
        filter(result => result),
      )
      .subscribe(response => wsClient.send(JSON.stringify(response)));

      wsClient.stringMessageHandlerRegistered = true;
      console.log('clients ',wsClient._socket.remotePort,' string-messages -> message-handlers (bound)!');
    };

    // add message handlers to static array
    if(!WsAdapterCustom.handlers){
      WsAdapterCustom.handlers = handlers;
    }
    else{
      WsAdapterCustom.handlers = WsAdapterCustom.handlers.concat(handlers);
    };

    handlers.forEach(element => {
      console.log('message-handler registered for message "',element.message,'" and client ',wsClient._socket.remotePort,'!');
    });

  }

  bindMessageHandler(
    buffer,
    handlers: MessageMappingProperties[],
    process: (data: any) => Observable<any>,
    wsClient: any
  ): Observable<any> {

    // if message payload is string (op-code = 1)
    if(wsClient._receiver._opcode === 1){

      console.log('received string-message from client ',wsClient._socket.remotePort,'!');

      try{
        const message = JSON.parse(buffer.data);
        const messageHandler = WsAdapterCustom.handlers.find(
          handler => handler.message === message.event,
        );
        if (!messageHandler) {
          return EMPTY;
        }
        // console.log((process));
        return process(messageHandler.callback(message.data));
      }
      catch (_a) {
        return EMPTY;
      }

    }
    else{
      // no string payload
      return EMPTY;
    }
  }

}

message.gateway.ts

import {SubscribeMessage, WebSocketGateway, MessageBody, ConnectedSocket } from '@nestjs/websockets';
import {MessagesService} from './messages.service';
import WebSocket = require('ws');

@WebSocketGateway()
export class MessagesGateway {

  constructor(private readonly messagesService : MessagesService){};

  @SubscribeMessage('message-stream-request')
  messageStreamCreate(@MessageBody() data: string, @ConnectedSocket() wsClient: any){
  // messageStreamCreate(wsClientTest: any, data: string){

    console.log('received data at subscription "message-stream-request" in MessageGateway!');

    console.log('creating message for client: ',wsClient._socket.remotePort,' at message.gateway!')
    this.messagesService.createStreamingMessage(wsClient,'de-DE');

    wsClient.send(JSON.stringify({event: 'message-stream-grant'}));

  }

  @SubscribeMessage('message-stream-eof')
  messageStreamComplete(@MessageBody() data: string, @ConnectedSocket() wsClient: any){
  // messageStreamComplete(wsClient: any, data: string){

    console.log('');
    console.log('received data at subscription "message-stream-eof" in MessageGateway!');
    this.messagesService.eofStreamingMessage(wsClient);

  }

}

Expected behavior

the parameter wsClient of function messageStreamCreate should give back the proper socket instance of the sender of the message. instead the instance of the socket that connected first to my server is passed to the function (sockets were identified by the wsClient._socket.remotePort parameter)

Possible Solution

Environment


Nest version: 6.12.9

For Tooling issues:
- Node version: v10.16.3 
- Platform: Windows 10

Others:

kamilmysliwiec commented 4 years ago

Please, use our Discord channel (support) for such questions. We are using GitHub to track bugs, feature requests, and potential improvements.

dekavox commented 4 years ago

hi kamil, sorry for that, wasn't shure if this was a bug or not. thanks for the fast response. cheers

xxRockOnxx commented 3 years ago

This still exists.

Is this the expected result? how do you get the sender client?

dekavox commented 3 years ago

Yes, this still seem to exist and in my opinion this is clearly a bug.

When I found the bug I tried to dig into the nestJS modules but didn't really manage to find the cause of this problem. So I created this solution as a hack that works for me:

in ws-adapter-custom.ts

bindMessageHandler(
    buffer: any,
    handlers: MessageMappingProperties[],
    process: (data: any) => Observable<any>,
    wsClient: any,
  ): Observable<any> {
    if (wsClient._receiver._opcode === 1) {
      try {
        const message = JSON.parse(buffer.data);
        const messageHandler = WsAdapterCustom.handlers.find(
          (handler) => handler.message === message.event,
        );
        if (!messageHandler) {
          return EMPTY;
        }
        // hack to include wsClient into parameter of message-callback
        // @ConnectedSocket() seems to give the wrong socket-instance at message-callback context
        const messageData: any = {
          wsClient,
          message,
        };
        return process(messageHandler.callback(messageData));
      } catch (_a) {
        return EMPTY;
      }
    } else {
      // no string payload
      return EMPTY;
    }
  }

in message.gateway.ts

  @SubscribeMessage('message-text')
  async receiveTextMessage(@MessageBody() messageBody: any) {

  let wsClient = messageBody.wsClient
  let wsMessage = messageBody.message

  }

Hope this solution works for you as well for now but ideally the @ConnectedSocket() decorator should work like intended and give the correct socket instance.

xxRockOnxx commented 3 years ago

I tried making a minimal reproduction on codesanbox as asked on Discord. It works for some reason.

Sandbox: https://codesandbox.io/s/pensive-lake-x9vfw

Steps:

I'm still investigating further if it's related to something else.

Cc: @kaupi1487

kodmanyagha commented 8 months ago

I think this is not a bug. If you use custom pipe than you must check the value in transform method. Because websocket client object is passing from pipe too.


// zod-validation-pipe.ts
@Injectable()
export class ZodValidationPipe implements PipeTransform {
  constructor(private schema: Schema) {}

  async transform(value: any): Promise<any> {
    console.log('>> 🚀 transform value:', typeof value, value);

    if (value instanceof WsClient) {
      return value;
    }
    // make your validation and return result in here...
}