nestjs / nest

A progressive Node.js framework for building efficient, scalable, and enterprise-grade server-side applications with TypeScript/JavaScript 🚀
https://nestjs.com
MIT License
67.28k stars 7.59k forks source link

Nest/microservice does not create an observable? #4775

Closed r3m4k3 closed 4 years ago

r3m4k3 commented 4 years ago

Hi guys,

I was trying to use streams with grpc. But when I use the below published code, it says that subscribe is not a functions. It seems like Nestjs/microservice does not create an observable from a stream. Could you please publish up-to-date example with grpc? Because the method in the docs is kind of outdated.

My code - proto file:

message Rating {
  string id = 1;
  uint32 value = 2;
  string comment = 3;
  string type = 4;
}

service RatingsRpcService {
  rpc Test (stream GetRatingRequest) returns (stream Rating);
}

message GetRatingRequest {
  string id = 1;
}

message Ratings {
  repeated Rating items = 1;
}

And the controller file:

    @GrpcStreamMethod('RatingsRpcService')
    test(msg: Observable<any>, metadata: any): Observable<any> {
        const subject = new Subject();
        msg.subscribe({
            next: (dto: any) => {
                subject.next({lol: 'aa'});
            },
            error: (err: any) => console.log(err),
            complete: () => {}
        });

        return subject.asObservable();
    }

And the error I get: TypeError: msg.subscribe is not a function

Versions:

    "@grpc/proto-loader": "^0.5.4",
    "@nestjs/core": "^7.0.0",
    "@nestjs/microservices": "^7.0.9"

Did I miss something?

r3m4k3 commented 4 years ago

Okay, I guess I found the issue. It's kinda strange, but changing the type from Observable fixed the error. I mean this one: messages: any instead of messages: Observable<any> or messages: Observable<RatingInterface>. Please find the code presented below:

    @GrpcStreamMethod('RatingsRpcService')
    async findByIdStream(messages: any, metadata: any): Promise<Observable<RatingInterface>> {
        const subject = new Subject<RatingInterface>();
        messages.subscribe({
            next: async (dto: GetRatingDto) => {
                const item = await this.ratingsService.findById(dto.id);
                subject.next(item);
            },
            error: (err: any) => {
                throw new RpcException('Could not process stream.')
            },
            complete: () => subject.complete()
        });

        return subject.asObservable();
    }
kamilmysliwiec commented 4 years ago

Please, use our Discord channel (support) for such questions. We are using GitHub to track bugs, feature requests, and potential improvements.

r3m4k3 commented 4 years ago

I understand this, I know the purpose of GH issues, however, it seems like a bug to me or the official docs is outdated. Please read the thread once again before you answer.