Closed maxime-agusti closed 3 years ago
This is related to how Kafka produces/consumes messages. For example, if you change this line:
return from(numbers);
in your ServerController
class to this instead:
return from(numbers).pipe(concatMap((item) => of(item).pipe(delay(10))));
which adds 10 milliseconds delay to each element emitted, you'll receive a proper response:
curl http://localhost:3000/count/5
["i-1","i-2","i-3","i-4","i-5"]
In fact, the order in which NestJS produces messages is valid regardless of the delay:
(console.log added in the ServerKafka.ts
class from @nestjs/microservices
package, line 193)
@mkaufmaner do you have any ideas on how we could prevent it?
@kamilmysliwiec @maximeag
Kamil, as you know from an recent PR I have opened I am actually working on refactoring the message pattern to account for previously assigned topics and partitions to prevent problems with rebalancing. In doing this, I have also encountered this issue. I am still tracking it down but I am pretty sure it has something to do with the fact that this.producer.send
is async
but the promise is not handled. This is because the abstract handler for sending messages or emitting events is not async. I believe this is because most transports don't provide an acknowledgement like Kafka does.
Let's track this here https://github.com/nestjs/nest/pull/5654
Let's track this here https://github.com/nestjs/nest/pull/5915
Thanks you very much folks ! You're the best 👍🏻
Bug Report
Current behavior
I'm experiencing a very strange behavior when returning an Observable from a
@MessagePattern()
handler in a NestJS microservice with a Kafka transport.It seems that the second-to-last element of an Observable is (almost) never received by a client which would call a
@MessagePattern()
handler.Input Code
I prepared a repo to easily reproduce the bug : https://github.com/maximeag/nestjs-kafka-based-microservices
It contains 2 NestJS apps :
When the app client receives a request on route GET
/count/:max
, it sends a request-response message to the Kafka topiccount
.In this example,
this.serverClient
is an instance ofClientKafka
.On the other side, when the app server received the request, it creates an array and sends the values as an Observable.
Here is what I got when I call the app client with max=5 :
Value
i-4
is missing.And with max=10:
Value
i-9
is missing.Expected behavior
All values should be received by the client :