nestjs / nest

A progressive Node.js framework for building efficient, scalable, and enterprise-grade server-side applications with TypeScript/JavaScript 🚀
https://nestjs.com
MIT License
67.29k stars 7.59k forks source link

How to build server streaming via grpc and nest.js? #6877

Closed muturgan closed 3 years ago

muturgan commented 3 years ago

Bug Report

can't subscribe a server grpc stream

Input Code

proto file:

syntax = "proto3";

package testtime;

service TimeService {    
    rpc GetTimeStream(Empty) returns (stream TimeStreamResponse);
}

message Empty {
}

message TimeStreamResponse {
    string result = 1;
}

controler

import { Controller } from '@nestjs/common';
import { GrpcMethod } from '@nestjs/microservices';
import moment from 'moment';
import { Observable, Subject } from 'rxjs';

const timeSubject = new Subject<{ result: string }>();
setInterval(() => {
    const result = moment().format('hh:mm');
    timeSubject.next({ result });
}, 5000);

@Controller()
export class TestTimeController {
    @GrpcMethod('testtime.TimeService', 'GetTimeStream')
    public getTimeStream(): Observable<{ result: string }> {
        return timeSubject.asObservable();
    }
}

Current behavior

when I try to call the method, I get an error:

/project/node_modules/@nestjs/microservices/server/server-grpc.js:141
             this.transformToObservable(await handler).subscribe(data => callback(null, data), (err) => callback(err));
                                                                         ^
 TypeError: callback is not a function
     at SafeSubscriber._next (/project/node_modules/@nestjs/microservices/server/server-grpc.js:141:73)
     at SafeSubscriber.__tryOrUnsub (/project/node_modules/rxjs/src/internal/Subscriber.ts:265:10)
     at SafeSubscriber.next (/project/node_modules/rxjs/src/internal/Subscriber.ts:207:14)
     at Subscriber._next (/project/node_modules/rxjs/src/internal/Subscriber.ts:139:22)
     at Subscriber.next (/project/node_modules/rxjs/src/internal/Subscriber.ts:99:12)
     at CatchSubscriber.Subscriber._next (/project/node_modules/rxjs/src/internal/Subscriber.ts:139:22)
     at CatchSubscriber.Subscriber.next (/project/node_modules/rxjs/src/internal/Subscriber.ts:99:12)
     at TapSubscriber._next (/project/node_modules/rxjs/src/internal/operators/tap.ts:125:22)
     at TapSubscriber.Subscriber.next (/project/node_modules/rxjs/src/internal/Subscriber.ts:99:12)
     at MergeMapSubscriber.notifyNext (/project/node_modules/rxjs/src/internal/operators/mergeMap.ts:162:22)
     at SimpleInnerSubscriber._next (/project/node_modules/rxjs/src/internal/innerSubscribe.ts:30:17)
     at SimpleInnerSubscriber.Subscriber.next (/project/node_modules/rxjs/src/internal/Subscriber.ts:99:12)
     at MergeMapSubscriber.notifyNext (/project/node_modules/rxjs/src/internal/operators/mergeMap.ts:162:22)
     at SimpleInnerSubscriber._next (/project/node_modules/rxjs/src/internal/innerSubscribe.ts:30:17)
     at SimpleInnerSubscriber.Subscriber.next (/project/node_modules/rxjs/src/internal/Subscriber.ts:99:12)
     at SwitchMapSubscriber.notifyNext (/project/node_modules/rxjs/src/internal/operators/switchMap.ts:166:24)

Expected behavior

it's possible to subscribe the stream

Environment

[System Information] OS Version : Linux 5.8 NodeJS Version : v14.16.1 NPM Version : v6.14.12

[Nest CLI] Nest CLI Version : 7.6.0

[Nest Platform Information] platform-express version : 7.6.15 microservices version : 7.6.15 schedule version : 0.4.3 typeorm version : 7.1.5 common version : 7.6.15 core version : 7.6.15

kamilmysliwiec commented 3 years ago

Please, use our Discord channel (support) for such questions. We are using GitHub to track bugs, feature requests, and potential improvements.

muturgan commented 3 years ago

The problem turned out to be that the packages from @opentelemetry patch the parsing of proto files. Unfortunately, I do not have a right to bring a show to the problem.